Merge branch 'master' into serverkeyformatdoc

This commit is contained in:
Kegsay 2020-10-15 18:12:25 +01:00 committed by GitHub
commit 8fa21c8202
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
51 changed files with 808 additions and 309 deletions

View file

@ -30,6 +30,7 @@ import (
"github.com/matrix-org/dendrite/appservice/workers"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/setup"
"github.com/matrix-org/dendrite/internal/setup/kafka"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/sirupsen/logrus"
@ -47,6 +48,8 @@ func NewInternalAPI(
userAPI userapi.UserInternalAPI,
rsAPI roomserverAPI.RoomserverInternalAPI,
) appserviceAPI.AppServiceQueryAPI {
consumer, _ := kafka.SetupConsumerProducer(&base.Cfg.Global.Kafka)
// Create a connection to the appservice postgres DB
appserviceDB, err := storage.NewDatabase(&base.Cfg.AppServiceAPI.Database)
if err != nil {
@ -86,7 +89,7 @@ func NewInternalAPI(
// We can't add ASes at runtime so this is safe to do.
if len(workerStates) > 0 {
consumer := consumers.NewOutputRoomEventConsumer(
base.Cfg, base.KafkaConsumer, appserviceDB,
base.Cfg, consumer, appserviceDB,
rsAPI, workerStates,
)
if err := consumer.Start(); err != nil {

View file

@ -29,6 +29,8 @@ services:
KAFKA_ADVERTISED_HOST_NAME: "kafka"
KAFKA_DELETE_TOPIC_ENABLE: "true"
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
ports:
- 9092:9092
depends_on:
- zookeeper
networks:

View file

@ -112,7 +112,7 @@ func (m *DendriteMonolith) Start() {
serverKeyAPI := &signing.YggdrasilKeys{}
keyRing := serverKeyAPI.KeyRing()
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation, base.KafkaProducer)
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation)
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI)
keyAPI.SetUserAPI(userAPI)
@ -146,13 +146,11 @@ func (m *DendriteMonolith) Start() {
rsAPI.SetFederationSenderAPI(fsAPI)
monolith := setup.Monolith{
Config: base.Cfg,
AccountDB: accountDB,
Client: ygg.CreateClient(base),
FedClient: federation,
KeyRing: keyRing,
KafkaConsumer: base.KafkaConsumer,
KafkaProducer: base.KafkaProducer,
Config: base.Cfg,
AccountDB: accountDB,
Client: ygg.CreateClient(base),
FedClient: federation,
KeyRing: keyRing,
AppserviceAPI: asAPI,
EDUInternalAPI: eduInputAPI,

View file

@ -15,7 +15,6 @@
package clientapi
import (
"github.com/Shopify/sarama"
"github.com/gorilla/mux"
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/clientapi/api"
@ -24,6 +23,7 @@ import (
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/setup/kafka"
"github.com/matrix-org/dendrite/internal/transactions"
keyserverAPI "github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
@ -36,7 +36,6 @@ import (
func AddPublicRoutes(
router *mux.Router,
cfg *config.ClientAPI,
producer sarama.SyncProducer,
accountsDB accounts.Database,
federation *gomatrixserverlib.FederationClient,
rsAPI roomserverAPI.RoomserverInternalAPI,
@ -48,6 +47,8 @@ func AddPublicRoutes(
keyAPI keyserverAPI.KeyInternalAPI,
extRoomsProvider api.ExtraPublicRoomsProvider,
) {
_, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
syncProducer := &producers.SyncAPIProducer{
Producer: producer,
Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData),

View file

@ -339,12 +339,21 @@ func createRoom(
util.GetLogger(req.Context()).WithError(err).Error("authEvents.AddEvent failed")
return jsonerror.InternalServerError()
}
}
// send events to the room server
if err = roomserverAPI.SendEvents(req.Context(), rsAPI, builtEvents, cfg.Matrix.ServerName, nil); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
accumulated := gomatrixserverlib.UnwrapEventHeaders(builtEvents)
if err = roomserverAPI.SendEventWithState(
req.Context(),
rsAPI,
&gomatrixserverlib.RespState{
StateEvents: accumulated,
AuthEvents: accumulated,
},
ev.Headered(roomVersion),
nil,
); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEventWithState failed")
return jsonerror.InternalServerError()
}
}
// TODO(#269): Reserve room alias while we create the room. This stops us

View file

@ -37,7 +37,7 @@ func main() {
keyAPI := base.KeyServerHTTPClient()
clientapi.AddPublicRoutes(
base.PublicClientAPIMux, &base.Cfg.ClientAPI, base.KafkaProducer, accountDB, federation,
base.PublicClientAPIMux, &base.Cfg.ClientAPI, accountDB, federation,
rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI, keyAPI, nil,
)

View file

@ -139,7 +139,7 @@ func main() {
accountDB := base.Base.CreateAccountsDB()
federation := createFederationClient(base)
keyAPI := keyserver.NewInternalAPI(&base.Base.Cfg.KeyServer, federation, base.Base.KafkaProducer)
keyAPI := keyserver.NewInternalAPI(&base.Base.Cfg.KeyServer, federation)
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI)
keyAPI.SetUserAPI(userAPI)
@ -169,13 +169,11 @@ func main() {
}
monolith := setup.Monolith{
Config: base.Base.Cfg,
AccountDB: accountDB,
Client: createClient(base),
FedClient: federation,
KeyRing: keyRing,
KafkaConsumer: base.Base.KafkaConsumer,
KafkaProducer: base.Base.KafkaProducer,
Config: base.Base.Cfg,
AccountDB: accountDB,
Client: createClient(base),
FedClient: federation,
KeyRing: keyRing,
AppserviceAPI: asAPI,
EDUInternalAPI: eduInputAPI,

View file

@ -96,7 +96,7 @@ func main() {
serverKeyAPI := &signing.YggdrasilKeys{}
keyRing := serverKeyAPI.KeyRing()
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation, base.KafkaProducer)
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation)
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI)
keyAPI.SetUserAPI(userAPI)
@ -129,13 +129,11 @@ func main() {
rsComponent.SetFederationSenderAPI(fsAPI)
monolith := setup.Monolith{
Config: base.Cfg,
AccountDB: accountDB,
Client: ygg.CreateClient(base),
FedClient: federation,
KeyRing: keyRing,
KafkaConsumer: base.KafkaConsumer,
KafkaProducer: base.KafkaProducer,
Config: base.Cfg,
AccountDB: accountDB,
Client: ygg.CreateClient(base),
FedClient: federation,
KeyRing: keyRing,
AppserviceAPI: asAPI,
EDUInternalAPI: eduInputAPI,

View file

@ -24,7 +24,7 @@ func main() {
base := setup.NewBaseDendrite(cfg, "KeyServer", true)
defer base.Close() // nolint: errcheck
intAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, base.CreateFederationClient(), base.KafkaProducer)
intAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, base.CreateFederationClient())
intAPI.SetUserAPI(base.UserAPIClient())
keyserver.AddInternalRoutes(base.InternalAPIMux, intAPI)

View file

@ -108,7 +108,7 @@ func main() {
// This is different to rsAPI which can be the http client which doesn't need this dependency
rsImpl.SetFederationSenderAPI(fsAPI)
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, fsAPI, base.KafkaProducer)
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, fsAPI)
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI)
keyAPI.SetUserAPI(userAPI)
@ -127,13 +127,11 @@ func main() {
}
monolith := setup.Monolith{
Config: base.Cfg,
AccountDB: accountDB,
Client: base.CreateClient(),
FedClient: federation,
KeyRing: keyRing,
KafkaConsumer: base.KafkaConsumer,
KafkaProducer: base.KafkaProducer,
Config: base.Cfg,
AccountDB: accountDB,
Client: base.CreateClient(),
FedClient: federation,
KeyRing: keyRing,
AppserviceAPI: asAPI,
EDUInternalAPI: eduInputAPI,

View file

@ -21,6 +21,7 @@ import (
func main() {
cfg := setup.ParseFlags(false)
base := setup.NewBaseDendrite(cfg, "SyncAPI", true)
defer base.Close() // nolint: errcheck
@ -30,7 +31,7 @@ func main() {
rsAPI := base.RoomserverHTTPClient()
syncapi.AddPublicRoutes(
base.PublicClientAPIMux, base.KafkaConsumer, userAPI, rsAPI,
base.PublicClientAPIMux, userAPI, rsAPI,
base.KeyServerHTTPClient(),
federation, &cfg.SyncAPI,
)

View file

@ -190,7 +190,7 @@ func main() {
accountDB := base.CreateAccountsDB()
federation := createFederationClient(cfg, node)
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation, base.KafkaProducer)
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation)
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI)
keyAPI.SetUserAPI(userAPI)
@ -212,13 +212,11 @@ func main() {
p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI, federation)
monolith := setup.Monolith{
Config: base.Cfg,
AccountDB: accountDB,
Client: createClient(node),
FedClient: federation,
KeyRing: &keyRing,
KafkaConsumer: base.KafkaConsumer,
KafkaProducer: base.KafkaProducer,
Config: base.Cfg,
AccountDB: accountDB,
Client: createClient(node),
FedClient: federation,
KeyRing: &keyRing,
AppserviceAPI: asQuery,
EDUInternalAPI: eduInputAPI,

View file

@ -8,19 +8,38 @@ import (
"log"
"os"
// Example complex Go migration import:
// _ "github.com/matrix-org/dendrite/serverkeyapi/storage/postgres/deltas"
pgaccounts "github.com/matrix-org/dendrite/userapi/storage/accounts/postgres/deltas"
slaccounts "github.com/matrix-org/dendrite/userapi/storage/accounts/sqlite3/deltas"
pgdevices "github.com/matrix-org/dendrite/userapi/storage/devices/postgres/deltas"
sldevices "github.com/matrix-org/dendrite/userapi/storage/devices/sqlite3/deltas"
"github.com/pressly/goose"
_ "github.com/lib/pq"
_ "github.com/mattn/go-sqlite3"
)
var (
flags = flag.NewFlagSet("goose", flag.ExitOnError)
dir = flags.String("dir", ".", "directory with migration files")
const (
AppService = "appservice"
FederationSender = "federationsender"
KeyServer = "keyserver"
MediaAPI = "mediaapi"
RoomServer = "roomserver"
SigningKeyServer = "signingkeyserver"
SyncAPI = "syncapi"
UserAPIAccounts = "userapi_accounts"
UserAPIDevices = "userapi_devices"
)
var (
dir = flags.String("dir", "", "directory with migration files")
flags = flag.NewFlagSet("goose", flag.ExitOnError)
component = flags.String("component", "", "dendrite component name")
knownDBs = []string{
AppService, FederationSender, KeyServer, MediaAPI, RoomServer, SigningKeyServer, SyncAPI, UserAPIAccounts, UserAPIDevices,
}
)
// nolint: gocyclo
func main() {
err := flags.Parse(os.Args[1:])
if err != nil {
@ -37,19 +56,20 @@ Drivers:
sqlite3
Examples:
goose -d roomserver/storage/sqlite3/deltas sqlite3 ./roomserver.db status
goose -d roomserver/storage/sqlite3/deltas sqlite3 ./roomserver.db up
goose -component roomserver sqlite3 ./roomserver.db status
goose -component roomserver sqlite3 ./roomserver.db up
goose -d roomserver/storage/postgres/deltas postgres "user=dendrite dbname=dendrite sslmode=disable" status
goose -component roomserver postgres "user=dendrite dbname=dendrite sslmode=disable" status
Options:
-dir string
directory with migration files (default ".")
-component string
Dendrite component name e.g roomserver, signingkeyserver, clientapi, syncapi
-table string
migrations table name (default "goose_db_version")
-h print help
-v enable verbose mode
-dir string
directory with migration files, only relevant when creating new migrations.
-version
print version
@ -74,6 +94,25 @@ Commands:
fmt.Println("engine must be one of 'sqlite3' or 'postgres'")
return
}
knownComponent := false
for _, c := range knownDBs {
if c == *component {
knownComponent = true
break
}
}
if !knownComponent {
fmt.Printf("component must be one of %v\n", knownDBs)
return
}
if engine == "sqlite3" {
loadSQLiteDeltas(*component)
} else {
loadPostgresDeltas(*component)
}
dbstring, command := args[1], args[2]
db, err := goose.OpenDBWithDriver(engine, dbstring)
@ -92,7 +131,30 @@ Commands:
arguments = append(arguments, args[3:]...)
}
if err := goose.Run(command, db, *dir, arguments...); err != nil {
// goose demands a directory even though we don't use it for upgrades
d := *dir
if d == "" {
d = os.TempDir()
}
if err := goose.Run(command, db, d, arguments...); err != nil {
log.Fatalf("goose %v: %v", command, err)
}
}
func loadSQLiteDeltas(component string) {
switch component {
case UserAPIAccounts:
slaccounts.LoadFromGoose()
case UserAPIDevices:
sldevices.LoadFromGoose()
}
}
func loadPostgresDeltas(component string) {
switch component {
case UserAPIAccounts:
pgaccounts.LoadFromGoose()
case UserAPIDevices:
pgdevices.LoadFromGoose()
}
}

View file

@ -24,6 +24,7 @@ import (
"github.com/matrix-org/dendrite/eduserver/inthttp"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/setup"
"github.com/matrix-org/dendrite/internal/setup/kafka"
userapi "github.com/matrix-org/dendrite/userapi/api"
)
@ -41,10 +42,13 @@ func NewInternalAPI(
userAPI userapi.UserInternalAPI,
) api.EDUServerInputAPI {
cfg := &base.Cfg.EDUServer
_, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
return &input.EDUServerInputAPI{
Cache: eduCache,
UserAPI: userAPI,
Producer: base.KafkaProducer,
Producer: producer,
OutputTypingEventTopic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent)),
OutputSendToDeviceEventTopic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent)),
ServerName: cfg.Matrix.ServerName,

View file

@ -289,6 +289,15 @@ func (t *txnReq) processEDUs(ctx context.Context) {
util.GetLogger(ctx).WithError(err).Error("Failed to unmarshal typing event")
continue
}
_, domain, err := gomatrixserverlib.SplitID('@', typingPayload.UserID)
if err != nil {
util.GetLogger(ctx).WithError(err).Error("Failed to split domain from typing event sender")
continue
}
if domain != t.Origin {
util.GetLogger(ctx).Warnf("Dropping typing event where sender domain (%q) doesn't match origin (%q)", domain, t.Origin)
continue
}
if err := eduserverAPI.SendTyping(ctx, t.eduAPI, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil {
util.GetLogger(ctx).WithError(err).Error("Failed to send typing event to edu server")
}
@ -467,6 +476,7 @@ func checkAllowedByState(e gomatrixserverlib.Event, stateEvents []gomatrixserver
return gomatrixserverlib.Allowed(e, &authUsingState)
}
// nolint:gocyclo
func (t *txnReq) processEventWithMissingState(ctx context.Context, e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) error {
// Do this with a fresh context, so that we keep working even if the
// original request times out. With any luck, by the time the remote
@ -504,36 +514,70 @@ func (t *txnReq) processEventWithMissingState(ctx context.Context, e gomatrixser
backwardsExtremity := &newEvents[0]
newEvents = newEvents[1:]
type respState struct {
// A snapshot is considered trustworthy if it came from our own roomserver.
// That's because the state will have been through state resolution once
// already in QueryStateAfterEvent.
trustworthy bool
*gomatrixserverlib.RespState
}
// at this point we know we're going to have a gap: we need to work out the room state at the new backwards extremity.
// Therefore, we cannot just query /state_ids with this event to get the state before. Instead, we need to query
// the state AFTER all the prev_events for this event, then apply state resolution to that to get the state before the event.
var states []*gomatrixserverlib.RespState
needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{*backwardsExtremity}).Tuples()
var states []*respState
for _, prevEventID := range backwardsExtremity.PrevEventIDs() {
// Look up what the state is after the backward extremity. This will either
// come from the roomserver, if we know all the required events, or it will
// come from a remote server via /state_ids if not.
var prevState *gomatrixserverlib.RespState
prevState, err = t.lookupStateAfterEvent(gmectx, roomVersion, backwardsExtremity.RoomID(), prevEventID, needed)
if err != nil {
util.GetLogger(ctx).WithError(err).Errorf("Failed to lookup state after prev_event: %s", prevEventID)
return err
prevState, trustworthy, lerr := t.lookupStateAfterEvent(gmectx, roomVersion, backwardsExtremity.RoomID(), prevEventID)
if lerr != nil {
util.GetLogger(ctx).WithError(lerr).Errorf("Failed to lookup state after prev_event: %s", prevEventID)
return lerr
}
// Append the state onto the collected state. We'll run this through the
// state resolution next.
states = append(states, prevState)
states = append(states, &respState{trustworthy, prevState})
}
// Now that we have collected all of the state from the prev_events, we'll
// run the state through the appropriate state resolution algorithm for the
// room. This does a couple of things:
// room if needed. This does a couple of things:
// 1. Ensures that the state is deduplicated fully for each state-key tuple
// 2. Ensures that we pick the latest events from both sets, in the case that
// one of the prev_events is quite a bit older than the others
resolvedState, err := t.resolveStatesAndCheck(gmectx, roomVersion, states, backwardsExtremity)
if err != nil {
util.GetLogger(ctx).WithError(err).Errorf("Failed to resolve state conflicts for event %s", backwardsExtremity.EventID())
return err
resolvedState := &gomatrixserverlib.RespState{}
switch len(states) {
case 0:
extremityIsCreate := backwardsExtremity.Type() == gomatrixserverlib.MRoomCreate && backwardsExtremity.StateKeyEquals("")
if !extremityIsCreate {
// There are no previous states and this isn't the beginning of the
// room - this is an error condition!
util.GetLogger(ctx).Errorf("Failed to lookup any state after prev_events")
return fmt.Errorf("expected %d states but got %d", len(backwardsExtremity.PrevEventIDs()), len(states))
}
case 1:
// There's only one previous state - if it's trustworthy (came from a
// local state snapshot which will already have been through state res),
// use it as-is. There's no point in resolving it again.
if states[0].trustworthy {
resolvedState = states[0].RespState
break
}
// Otherwise, if it isn't trustworthy (came from federation), run it through
// state resolution anyway for safety, in case there are duplicates.
fallthrough
default:
respStates := make([]*gomatrixserverlib.RespState, len(states))
for i := range states {
respStates[i] = states[i].RespState
}
// There's more than one previous state - run them all through state res
resolvedState, err = t.resolveStatesAndCheck(gmectx, roomVersion, respStates, backwardsExtremity)
if err != nil {
util.GetLogger(ctx).WithError(err).Errorf("Failed to resolve state conflicts for event %s", backwardsExtremity.EventID())
return err
}
}
// First of all, send the backward extremity into the roomserver with the
@ -573,16 +617,16 @@ func (t *txnReq) processEventWithMissingState(ctx context.Context, e gomatrixser
// lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event)
// added into the mix.
func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string, needed []gomatrixserverlib.StateKeyTuple) (*gomatrixserverlib.RespState, error) {
func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (*gomatrixserverlib.RespState, bool, error) {
// try doing all this locally before we resort to querying federation
respState := t.lookupStateAfterEventLocally(ctx, roomID, eventID, needed)
respState := t.lookupStateAfterEventLocally(ctx, roomID, eventID)
if respState != nil {
return respState, nil
return respState, true, nil
}
respState, err := t.lookupStateBeforeEvent(ctx, roomVersion, roomID, eventID)
if err != nil {
return nil, fmt.Errorf("t.lookupStateBeforeEvent: %w", err)
return nil, false, fmt.Errorf("t.lookupStateBeforeEvent: %w", err)
}
servers := t.getServers(ctx, roomID)
@ -594,11 +638,11 @@ func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrix
h, err := t.lookupEvent(ctx, roomVersion, eventID, false, servers)
switch err.(type) {
case verifySigError:
return respState, nil
return respState, false, nil
case nil:
// do nothing
default:
return nil, fmt.Errorf("t.lookupEvent: %w", err)
return nil, false, fmt.Errorf("t.lookupEvent: %w", err)
}
t.haveEvents[h.EventID()] = h
if h.StateKey() != nil {
@ -616,15 +660,14 @@ func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrix
}
}
return respState, nil
return respState, false, nil
}
func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, eventID string, needed []gomatrixserverlib.StateKeyTuple) *gomatrixserverlib.RespState {
func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, eventID string) *gomatrixserverlib.RespState {
var res api.QueryStateAfterEventsResponse
err := t.rsAPI.QueryStateAfterEvents(ctx, &api.QueryStateAfterEventsRequest{
RoomID: roomID,
PrevEventIDs: []string{eventID},
StateToFetch: needed,
}, &res)
if err != nil || !res.PrevEventsExist {
util.GetLogger(ctx).WithError(err).Warnf("failed to query state after %s locally", eventID)

View file

@ -24,6 +24,7 @@ import (
"github.com/matrix-org/dendrite/federationsender/statistics"
"github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/internal/setup"
"github.com/matrix-org/dendrite/internal/setup/kafka"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
@ -55,6 +56,8 @@ func NewInternalAPI(
FailuresUntilBlacklist: cfg.FederationMaxRetries,
}
consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
queues := queue.NewOutgoingQueues(
federationSenderDB, cfg.Matrix.ServerName, federation,
rsAPI, stats,
@ -66,7 +69,7 @@ func NewInternalAPI(
)
rsConsumer := consumers.NewOutputRoomEventConsumer(
cfg, base.KafkaConsumer, queues,
cfg, consumer, queues,
federationSenderDB, rsAPI,
)
if err = rsConsumer.Start(); err != nil {
@ -74,13 +77,13 @@ func NewInternalAPI(
}
tsConsumer := consumers.NewOutputEDUConsumer(
cfg, base.KafkaConsumer, queues, federationSenderDB,
cfg, consumer, queues, federationSenderDB,
)
if err := tsConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start typing server consumer")
}
keyConsumer := consumers.NewKeyChangeConsumer(
&base.Cfg.KeyServer, base.KafkaConsumer, queues, federationSenderDB, rsAPI,
&base.Cfg.KeyServer, consumer, queues, federationSenderDB, rsAPI,
)
if err := keyConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start key server consumer")

2
go.mod
View file

@ -22,7 +22,7 @@ require (
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4
github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd
github.com/matrix-org/gomatrixserverlib v0.0.0-20201009153043-8d27a9f0e350
github.com/matrix-org/gomatrixserverlib v0.0.0-20201015151920-aa4f62b827b8
github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
github.com/mattn/go-sqlite3 v1.14.2

4
go.sum
View file

@ -569,8 +569,8 @@ github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bh
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd h1:xVrqJK3xHREMNjwjljkAUaadalWc0rRbmVuQatzmgwg=
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
github.com/matrix-org/gomatrixserverlib v0.0.0-20201009153043-8d27a9f0e350 h1:G9K8k5KIzbeBdd0bMk+4itdZU3JGHgV+z0FNUsTEhkE=
github.com/matrix-org/gomatrixserverlib v0.0.0-20201009153043-8d27a9f0e350/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/gomatrixserverlib v0.0.0-20201015151920-aa4f62b827b8 h1:GF1PxbvImWDoz1DQZNMoaYtIqQXtyLAtmQOzwwmw1OI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20201015151920-aa4f62b827b8/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91 h1:HJ6U3S3ljJqNffYMcIeAncp5qT/i+ZMiJ2JC2F0aXP4=
github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91/go.mod h1:sjyPyRxKM5uw1nD2cJ6O2OxI6GOqyVBfNXqKjBZTBZE=
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 h1:ntrLa/8xVzeSs8vHFHK25k0C+NV74sYMJnNSg5NoSRo=

View file

@ -26,13 +26,9 @@ import (
"github.com/matrix-org/gomatrixserverlib"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/matrix-org/naffka"
naffkaStorage "github.com/matrix-org/naffka/storage"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/userapi/storage/accounts"
"github.com/Shopify/sarama"
"github.com/gorilla/mux"
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
@ -73,8 +69,8 @@ type BaseDendrite struct {
httpClient *http.Client
Cfg *config.Dendrite
Caches *caching.Caches
KafkaConsumer sarama.Consumer
KafkaProducer sarama.SyncProducer
// KafkaConsumer sarama.Consumer
// KafkaProducer sarama.SyncProducer
}
const HTTPServerTimeout = time.Minute * 5
@ -106,14 +102,6 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs boo
logrus.WithError(err).Panicf("failed to start opentracing")
}
var kafkaConsumer sarama.Consumer
var kafkaProducer sarama.SyncProducer
if cfg.Global.Kafka.UseNaffka {
kafkaConsumer, kafkaProducer = setupNaffka(cfg)
} else {
kafkaConsumer, kafkaProducer = setupKafka(cfg)
}
cache, err := caching.NewInMemoryLRUCache(true)
if err != nil {
logrus.WithError(err).Warnf("Failed to create cache")
@ -152,8 +140,6 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs boo
InternalAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.InternalPathPrefix).Subrouter().UseEncodedPath(),
apiHttpClient: &apiClient,
httpClient: &client,
KafkaConsumer: kafkaConsumer,
KafkaProducer: kafkaProducer,
}
}
@ -334,31 +320,3 @@ func (b *BaseDendrite) SetupAndServeHTTP(
select {}
}
// setupKafka creates kafka consumer/producer pair from the config.
func setupKafka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) {
consumer, err := sarama.NewConsumer(cfg.Global.Kafka.Addresses, nil)
if err != nil {
logrus.WithError(err).Panic("failed to start kafka consumer")
}
producer, err := sarama.NewSyncProducer(cfg.Global.Kafka.Addresses, nil)
if err != nil {
logrus.WithError(err).Panic("failed to setup kafka producers")
}
return consumer, producer
}
// setupNaffka creates kafka consumer/producer pair from the config.
func setupNaffka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) {
naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Global.Kafka.Database.ConnectionString))
if err != nil {
logrus.WithError(err).Panic("Failed to setup naffka database")
}
naff, err := naffka.New(naffkaDB)
if err != nil {
logrus.WithError(err).Panic("Failed to setup naffka")
}
return naff, naff
}

View file

@ -0,0 +1,53 @@
package kafka
import (
"github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/naffka"
naffkaStorage "github.com/matrix-org/naffka/storage"
"github.com/sirupsen/logrus"
)
func SetupConsumerProducer(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
if cfg.UseNaffka {
return setupNaffka(cfg)
}
return setupKafka(cfg)
}
// setupKafka creates kafka consumer/producer pair from the config.
func setupKafka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
consumer, err := sarama.NewConsumer(cfg.Addresses, nil)
if err != nil {
logrus.WithError(err).Panic("failed to start kafka consumer")
}
producer, err := sarama.NewSyncProducer(cfg.Addresses, nil)
if err != nil {
logrus.WithError(err).Panic("failed to setup kafka producers")
}
return consumer, producer
}
// In monolith mode with Naffka, we don't have the same constraints about
// consuming the same topic from more than one place like we do with Kafka.
// Therefore, we will only open one Naffka connection in case Naffka is
// running on SQLite.
var naffkaInstance *naffka.Naffka
// setupNaffka creates kafka consumer/producer pair from the config.
func setupNaffka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
if naffkaInstance != nil {
return naffkaInstance, naffkaInstance
}
naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Database.ConnectionString))
if err != nil {
logrus.WithError(err).Panic("Failed to setup naffka database")
}
naffkaInstance, err = naffka.New(naffkaDB)
if err != nil {
logrus.WithError(err).Panic("Failed to setup naffka")
}
return naffkaInstance, naffkaInstance
}

View file

@ -15,7 +15,6 @@
package setup
import (
"github.com/Shopify/sarama"
"github.com/gorilla/mux"
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/clientapi"
@ -38,13 +37,11 @@ import (
// Monolith represents an instantiation of all dependencies required to build
// all components of Dendrite, for use in monolith mode.
type Monolith struct {
Config *config.Dendrite
AccountDB accounts.Database
KeyRing *gomatrixserverlib.KeyRing
Client *gomatrixserverlib.Client
FedClient *gomatrixserverlib.FederationClient
KafkaConsumer sarama.Consumer
KafkaProducer sarama.SyncProducer
Config *config.Dendrite
AccountDB accounts.Database
KeyRing *gomatrixserverlib.KeyRing
Client *gomatrixserverlib.Client
FedClient *gomatrixserverlib.FederationClient
AppserviceAPI appserviceAPI.AppServiceQueryAPI
EDUInternalAPI eduServerAPI.EDUServerInputAPI
@ -61,7 +58,7 @@ type Monolith struct {
// AddAllPublicRoutes attaches all public paths to the given router
func (m *Monolith) AddAllPublicRoutes(csMux, ssMux, keyMux, mediaMux *mux.Router) {
clientapi.AddPublicRoutes(
csMux, &m.Config.ClientAPI, m.KafkaProducer, m.AccountDB,
csMux, &m.Config.ClientAPI, m.AccountDB,
m.FedClient, m.RoomserverAPI,
m.EDUInternalAPI, m.AppserviceAPI, transactions.New(),
m.FederationSenderAPI, m.UserAPI, m.KeyAPI, m.ExtPublicRoomsProvider,
@ -73,7 +70,7 @@ func (m *Monolith) AddAllPublicRoutes(csMux, ssMux, keyMux, mediaMux *mux.Router
)
mediaapi.AddPublicRoutes(mediaMux, &m.Config.MediaAPI, m.UserAPI, m.Client)
syncapi.AddPublicRoutes(
csMux, m.KafkaConsumer, m.UserAPI, m.RoomserverAPI,
csMux, m.UserAPI, m.RoomserverAPI,
m.KeyAPI, m.FedClient, &m.Config.SyncAPI,
)
}

130
internal/sqlutil/migrate.go Normal file
View file

@ -0,0 +1,130 @@
package sqlutil
import (
"database/sql"
"fmt"
"runtime"
"sort"
"github.com/matrix-org/dendrite/internal/config"
"github.com/pressly/goose"
)
type Migrations struct {
registeredGoMigrations map[int64]*goose.Migration
}
func NewMigrations() *Migrations {
return &Migrations{
registeredGoMigrations: make(map[int64]*goose.Migration),
}
}
// Copy-pasted from goose directly to store migrations into a map we control
// AddMigration adds a migration.
func (m *Migrations) AddMigration(up func(*sql.Tx) error, down func(*sql.Tx) error) {
_, filename, _, _ := runtime.Caller(1)
m.AddNamedMigration(filename, up, down)
}
// AddNamedMigration : Add a named migration.
func (m *Migrations) AddNamedMigration(filename string, up func(*sql.Tx) error, down func(*sql.Tx) error) {
v, _ := goose.NumericComponent(filename)
migration := &goose.Migration{Version: v, Next: -1, Previous: -1, Registered: true, UpFn: up, DownFn: down, Source: filename}
if existing, ok := m.registeredGoMigrations[v]; ok {
panic(fmt.Sprintf("failed to add migration %q: version conflicts with %q", filename, existing.Source))
}
m.registeredGoMigrations[v] = migration
}
// RunDeltas up to the latest version.
func (m *Migrations) RunDeltas(db *sql.DB, props *config.DatabaseOptions) error {
maxVer := goose.MaxVersion
minVer := int64(0)
migrations, err := m.collect(minVer, maxVer)
if err != nil {
return fmt.Errorf("RunDeltas: Failed to collect migrations: %w", err)
}
if props.ConnectionString.IsPostgres() {
if err = goose.SetDialect("postgres"); err != nil {
return err
}
} else if props.ConnectionString.IsSQLite() {
if err = goose.SetDialect("sqlite3"); err != nil {
return err
}
} else {
return fmt.Errorf("Unknown connection string: %s", props.ConnectionString)
}
for {
current, err := goose.EnsureDBVersion(db)
if err != nil {
return fmt.Errorf("RunDeltas: Failed to EnsureDBVersion: %w", err)
}
next, err := migrations.Next(current)
if err != nil {
if err == goose.ErrNoNextVersion {
return nil
}
return fmt.Errorf("RunDeltas: Failed to load next migration to %+v : %w", next, err)
}
if err = next.Up(db); err != nil {
return fmt.Errorf("RunDeltas: Failed run migration: %w", err)
}
}
}
func (m *Migrations) collect(current, target int64) (goose.Migrations, error) {
var migrations goose.Migrations
// Go migrations registered via goose.AddMigration().
for _, migration := range m.registeredGoMigrations {
v, err := goose.NumericComponent(migration.Source)
if err != nil {
return nil, err
}
if versionFilter(v, current, target) {
migrations = append(migrations, migration)
}
}
migrations = sortAndConnectMigrations(migrations)
return migrations, nil
}
func sortAndConnectMigrations(migrations goose.Migrations) goose.Migrations {
sort.Sort(migrations)
// now that we're sorted in the appropriate direction,
// populate next and previous for each migration
for i, m := range migrations {
prev := int64(-1)
if i > 0 {
prev = migrations[i-1].Version
migrations[i-1].Next = m.Version
}
migrations[i].Previous = prev
}
return migrations
}
func versionFilter(v, current, target int64) bool {
if target > current {
return v > current && v <= target
}
if target < current {
return v <= current && v > target
}
return false
}

View file

@ -1,6 +1,12 @@
package internal
import "fmt"
import (
"fmt"
"strings"
)
// the final version string
var version string
// -ldflags "-X github.com/matrix-org/dendrite/internal.branch=master"
var branch string
@ -16,12 +22,22 @@ const (
)
func VersionString() string {
version := fmt.Sprintf("%d.%d.%d%s", VersionMajor, VersionMinor, VersionPatch, VersionTag)
if branch != "" {
version += fmt.Sprintf("-%s", branch)
}
if build != "" {
version += fmt.Sprintf("+%s", build)
}
return version
}
func init() {
version = fmt.Sprintf("%d.%d.%d", VersionMajor, VersionMinor, VersionPatch)
if VersionTag != "" {
version += "-" + VersionTag
}
parts := []string{}
if build != "" {
parts = append(parts, build)
}
if branch != "" {
parts = append(parts, branch)
}
if len(parts) > 0 {
version += "+" + strings.Join(parts, ".")
}
}

View file

@ -15,10 +15,10 @@
package keyserver
import (
"github.com/Shopify/sarama"
"github.com/gorilla/mux"
fedsenderapi "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/setup/kafka"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/internal"
"github.com/matrix-org/dendrite/keyserver/inthttp"
@ -36,8 +36,10 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) {
// NewInternalAPI returns a concerete implementation of the internal API. Callers
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
func NewInternalAPI(
cfg *config.KeyServer, fedClient fedsenderapi.FederationClient, producer sarama.SyncProducer,
cfg *config.KeyServer, fedClient fedsenderapi.FederationClient,
) api.KeyInternalAPI {
_, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
db, err := storage.NewDatabase(&cfg.Database)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to key server database")

View file

@ -63,7 +63,8 @@ type QueryStateAfterEventsRequest struct {
RoomID string `json:"room_id"`
// The list of previous events to return the events after.
PrevEventIDs []string `json:"prev_event_ids"`
// The state key tuples to fetch from the state
// The state key tuples to fetch from the state. If none are specified then
// the entire resolved room state will be returned.
StateToFetch []gomatrixserverlib.StateKeyTuple `json:"state_to_fetch"`
}

View file

@ -133,8 +133,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
// If the event has already been written to the output log then we
// don't need to do anything, as we've handled it already.
hasBeenSent, err := u.updater.HasEventBeenSent(u.stateAtEvent.EventNID)
if err != nil {
if hasBeenSent, err := u.updater.HasEventBeenSent(u.stateAtEvent.EventNID); err != nil {
return fmt.Errorf("u.updater.HasEventBeenSent: %w", err)
} else if hasBeenSent {
return nil
@ -142,17 +141,19 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
// Work out what the latest events are. This will include the new
// event if it is not already referenced.
u.calculateLatest(
if err := u.calculateLatest(
oldLatest,
types.StateAtEventAndReference{
EventReference: u.event.EventReference(),
StateAtEvent: u.stateAtEvent,
},
)
); err != nil {
return fmt.Errorf("u.calculateLatest: %w", err)
}
// Now that we know what the latest events are, it's time to get the
// latest state.
if err = u.latestState(); err != nil {
if err := u.latestState(); err != nil {
return fmt.Errorf("u.latestState: %w", err)
}
@ -261,7 +262,7 @@ func (u *latestEventsUpdater) latestState() error {
func (u *latestEventsUpdater) calculateLatest(
oldLatest []types.StateAtEventAndReference,
newEvent types.StateAtEventAndReference,
) {
) error {
var newLatest []types.StateAtEventAndReference
// First of all, let's see if any of the existing forward extremities
@ -271,6 +272,7 @@ func (u *latestEventsUpdater) calculateLatest(
referenced, err := u.updater.IsReferenced(l.EventReference)
if err != nil {
logrus.WithError(err).Errorf("Failed to retrieve event reference for %q", l.EventID)
return fmt.Errorf("u.updater.IsReferenced (old): %w", err)
} else if !referenced {
newLatest = append(newLatest, l)
}
@ -285,7 +287,7 @@ func (u *latestEventsUpdater) calculateLatest(
// We've already referenced this new event so we can just return
// the newly completed extremities at this point.
u.latest = newLatest
return
return nil
}
}
@ -296,11 +298,13 @@ func (u *latestEventsUpdater) calculateLatest(
referenced, err := u.updater.IsReferenced(newEvent.EventReference)
if err != nil {
logrus.WithError(err).Errorf("Failed to retrieve event reference for %q", newEvent.EventReference.EventID)
return fmt.Errorf("u.updater.IsReferenced (new): %w", err)
} else if !referenced || len(newLatest) == 0 {
newLatest = append(newLatest, newEvent)
}
u.latest = newLatest
return nil
}
func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) {

View file

@ -49,6 +49,7 @@ func (r *Queryer) QueryLatestEventsAndState(
}
// QueryStateAfterEvents implements api.RoomserverInternalAPI
// nolint:gocyclo
func (r *Queryer) QueryStateAfterEvents(
ctx context.Context,
request *api.QueryStateAfterEventsRequest,
@ -78,10 +79,18 @@ func (r *Queryer) QueryStateAfterEvents(
}
response.PrevEventsExist = true
// Look up the currrent state for the requested tuples.
stateEntries, err := roomState.LoadStateAfterEventsForStringTuples(
ctx, prevStates, request.StateToFetch,
)
var stateEntries []types.StateEntry
if len(request.StateToFetch) == 0 {
// Look up all of the current room state.
stateEntries, err = roomState.LoadCombinedStateAfterEvents(
ctx, prevStates,
)
} else {
// Look up the current state for the requested tuples.
stateEntries, err = roomState.LoadStateAfterEventsForStringTuples(
ctx, prevStates, request.StateToFetch,
)
}
if err != nil {
return err
}
@ -91,6 +100,24 @@ func (r *Queryer) QueryStateAfterEvents(
return err
}
if len(request.PrevEventIDs) > 1 && len(request.StateToFetch) == 0 {
var authEventIDs []string
for _, e := range stateEvents {
authEventIDs = append(authEventIDs, e.AuthEventIDs()...)
}
authEventIDs = util.UniqueStrings(authEventIDs)
authEvents, err := getAuthChain(ctx, r.DB.EventsFromIDs, authEventIDs)
if err != nil {
return fmt.Errorf("getAuthChain: %w", err)
}
stateEvents, err = state.ResolveConflictsAdhoc(info.RoomVersion, stateEvents, authEvents)
if err != nil {
return fmt.Errorf("state.ResolveConflictsAdhoc: %w", err)
}
}
for _, event := range stateEvents {
response.StateEvents = append(response.StateEvents, event.Headered(info.RoomVersion))
}

View file

@ -22,6 +22,7 @@ import (
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/setup"
"github.com/matrix-org/dendrite/internal/setup/kafka"
"github.com/matrix-org/dendrite/roomserver/internal"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/sirupsen/logrus"
@ -41,6 +42,8 @@ func NewInternalAPI(
) api.RoomserverInternalAPI {
cfg := &base.Cfg.RoomServer
_, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
var perspectiveServerNames []gomatrixserverlib.ServerName
for _, kp := range base.Cfg.SigningKeyServer.KeyPerspectives {
perspectiveServerNames = append(perspectiveServerNames, kp.ServerName)
@ -52,7 +55,7 @@ func NewInternalAPI(
}
return internal.NewRoomserverAPI(
cfg, roomserverDB, base.KafkaProducer, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)),
cfg, roomserverDB, producer, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)),
base.Caches, keyRing, perspectiveServerNames,
)
}

View file

@ -17,7 +17,10 @@ import (
"github.com/matrix-org/dendrite/internal/setup"
"github.com/matrix-org/dendrite/internal/test"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/internal"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
)
const (
@ -160,7 +163,9 @@ func mustCreateRoomserverAPI(t *testing.T) (api.RoomserverInternalAPI, *dummyPro
cfg.Defaults()
cfg.Global.ServerName = testOrigin
cfg.Global.Kafka.UseNaffka = true
cfg.RoomServer.Database.ConnectionString = config.DataSource(roomserverDBFileURI)
cfg.RoomServer.Database = config.DatabaseOptions{
ConnectionString: roomserverDBFileURI,
}
dp := &dummyProducer{
topic: cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent),
}
@ -169,12 +174,17 @@ func mustCreateRoomserverAPI(t *testing.T) (api.RoomserverInternalAPI, *dummyPro
t.Fatalf("failed to make caches: %s", err)
}
base := &setup.BaseDendrite{
KafkaProducer: dp,
Caches: cache,
Cfg: cfg,
Caches: cache,
Cfg: cfg,
}
return NewInternalAPI(base, &test.NopJSONVerifier{}), dp
roomserverDB, err := storage.Open(&cfg.RoomServer.Database, base.Caches)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to room server db")
}
return internal.NewRoomserverAPI(
&cfg.RoomServer, roomserverDB, dp, string(cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent)),
base.Caches, &test.NopJSONVerifier{}, nil,
), dp
}
func mustSendEvents(t *testing.T, ver gomatrixserverlib.RoomVersion, events []json.RawMessage) (api.RoomserverInternalAPI, *dummyProducer, []gomatrixserverlib.HeaderedEvent) {

View file

@ -39,7 +39,8 @@ CREATE INDEX IF NOT EXISTS roomserver_redactions_redacts_event_id ON roomserver_
const insertRedactionSQL = "" +
"INSERT INTO roomserver_redactions (redaction_event_id, redacts_event_id, validated)" +
" VALUES ($1, $2, $3)"
" VALUES ($1, $2, $3)" +
" ON CONFLICT DO NOTHING"
const selectRedactionInfoByRedactionEventIDSQL = "" +
"SELECT redaction_event_id, redacts_event_id, validated FROM roomserver_redactions" +

View file

@ -18,23 +18,30 @@ type LatestEventsUpdater struct {
currentStateSnapshotNID types.StateSnapshotNID
}
func rollback(txn *sql.Tx) {
if txn == nil {
return
}
txn.Rollback() // nolint: errcheck
}
func NewLatestEventsUpdater(ctx context.Context, d *Database, txn *sql.Tx, roomInfo types.RoomInfo) (*LatestEventsUpdater, error) {
eventNIDs, lastEventNIDSent, currentStateSnapshotNID, err :=
d.RoomsTable.SelectLatestEventsNIDsForUpdate(ctx, txn, roomInfo.RoomNID)
if err != nil {
txn.Rollback() // nolint: errcheck
rollback(txn)
return nil, err
}
stateAndRefs, err := d.EventsTable.BulkSelectStateAtEventAndReference(ctx, txn, eventNIDs)
if err != nil {
txn.Rollback() // nolint: errcheck
rollback(txn)
return nil, err
}
var lastEventIDSent string
if lastEventNIDSent != 0 {
lastEventIDSent, err = d.EventsTable.SelectEventID(ctx, txn, lastEventNIDSent)
if err != nil {
txn.Rollback() // nolint: errcheck
rollback(txn)
return nil, err
}
}

View file

@ -458,7 +458,7 @@ func (d *Database) StoreEvent(
eventNID, stateNID, err = d.EventsTable.SelectEvent(ctx, txn, event.EventID())
}
if err != nil {
return err
return fmt.Errorf("d.EventsTable.SelectEvent: %w", err)
}
}
@ -467,6 +467,9 @@ func (d *Database) StoreEvent(
}
if !isRejected { // ignore rejected redaction events
redactionEvent, redactedEventID, err = d.handleRedactions(ctx, txn, eventNID, event)
if err != nil {
return fmt.Errorf("d.handleRedactions: %w", err)
}
}
return nil
})
@ -627,6 +630,7 @@ func extractRoomVersionFromCreateEvent(event gomatrixserverlib.Event) (
// to cross-reference with other tables when loading.
//
// Returns the redaction event and the event ID of the redacted event if this call resulted in a redaction.
// nolint:gocyclo
func (d *Database) handleRedactions(
ctx context.Context, txn *sql.Tx, eventNID types.EventNID, event gomatrixserverlib.Event,
) (*gomatrixserverlib.Event, string, error) {
@ -644,13 +648,13 @@ func (d *Database) handleRedactions(
RedactsEventID: event.Redacts(),
})
if err != nil {
return nil, "", err
return nil, "", fmt.Errorf("d.RedactionsTable.InsertRedaction: %w", err)
}
}
redactionEvent, redactedEvent, validated, err := d.loadRedactionPair(ctx, txn, eventNID, event)
if err != nil {
return nil, "", err
return nil, "", fmt.Errorf("d.loadRedactionPair: %w", err)
}
if validated || redactedEvent == nil || redactionEvent == nil {
// we've seen this redaction before or there is nothing to redact
@ -664,7 +668,7 @@ func (d *Database) handleRedactions(
// mark the event as redacted
err = redactedEvent.SetUnsignedField("redacted_because", redactionEvent)
if err != nil {
return nil, "", err
return nil, "", fmt.Errorf("redactedEvent.SetUnsignedField: %w", err)
}
if redactionsArePermanent {
redactedEvent.Event = redactedEvent.Redact()
@ -672,10 +676,15 @@ func (d *Database) handleRedactions(
// overwrite the eventJSON table
err = d.EventJSONTable.InsertEventJSON(ctx, txn, redactedEvent.EventNID, redactedEvent.JSON())
if err != nil {
return nil, "", err
return nil, "", fmt.Errorf("d.EventJSONTable.InsertEventJSON: %w", err)
}
return &redactionEvent.Event, redactedEvent.EventID(), d.RedactionsTable.MarkRedactionValidated(ctx, txn, redactionEvent.EventID(), true)
err = d.RedactionsTable.MarkRedactionValidated(ctx, txn, redactionEvent.EventID(), true)
if err != nil {
err = fmt.Errorf("d.RedactionsTable.MarkRedactionValidated: %w", err)
}
return &redactionEvent.Event, redactedEvent.EventID(), err
}
// loadRedactionPair returns both the redaction event and the redacted event, else nil.

View file

@ -37,7 +37,7 @@ CREATE TABLE IF NOT EXISTS roomserver_redactions (
`
const insertRedactionSQL = "" +
"INSERT INTO roomserver_redactions (redaction_event_id, redacts_event_id, validated)" +
"INSERT OR IGNORE INTO roomserver_redactions (redaction_event_id, redacts_event_id, validated)" +
" VALUES ($1, $2, $3)"
const selectRedactionInfoByRedactionEventIDSQL = "" +

View file

@ -17,11 +17,11 @@ package syncapi
import (
"context"
"github.com/Shopify/sarama"
"github.com/gorilla/mux"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/setup/kafka"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api"
@ -37,13 +37,14 @@ import (
// component.
func AddPublicRoutes(
router *mux.Router,
consumer sarama.Consumer,
userAPI userapi.UserInternalAPI,
rsAPI api.RoomserverInternalAPI,
keyAPI keyapi.KeyInternalAPI,
federation *gomatrixserverlib.FederationClient,
cfg *config.SyncAPI,
) {
consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
syncDB, err := storage.NewSyncServerDatasource(&cfg.Database)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to sync db")

View file

@ -482,3 +482,5 @@ m.room.history_visibility == "joined" allows/forbids appropriately for Real user
POST rejects invalid utf-8 in JSON
Users cannot kick users who have already left a room
A prev_batch token from incremental sync can be used in the v1 messages API
Event with an invalid signature in the send_join response should not cause room join to fail
Inbound federation rejects typing notifications from wrong remote

View file

@ -75,11 +75,12 @@ type accountsStatements struct {
serverName gomatrixserverlib.ServerName
}
func (s *accountsStatements) execSchema(db *sql.DB) error {
_, err := db.Exec(accountsSchema)
return err
}
func (s *accountsStatements) prepare(db *sql.DB, server gomatrixserverlib.ServerName) (err error) {
_, err = db.Exec(accountsSchema)
if err != nil {
return
}
if s.insertAccountStmt, err = db.Prepare(insertAccountSQL); err != nil {
return
}

View file

@ -0,0 +1,33 @@
package deltas
import (
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/pressly/goose"
)
func LoadFromGoose() {
goose.AddMigration(UpIsActive, DownIsActive)
}
func LoadIsActive(m *sqlutil.Migrations) {
m.AddMigration(UpIsActive, DownIsActive)
}
func UpIsActive(tx *sql.Tx) error {
_, err := tx.Exec("ALTER TABLE account_accounts ADD COLUMN IF NOT EXISTS is_deactivated BOOLEAN DEFAULT FALSE;")
if err != nil {
return fmt.Errorf("failed to execute upgrade: %w", err)
}
return nil
}
func DownIsActive(tx *sql.Tx) error {
_, err := tx.Exec("ALTER TABLE account_accounts DROP COLUMN is_deactivated;")
if err != nil {
return fmt.Errorf("failed to execute downgrade: %w", err)
}
return nil
}

View file

@ -1,9 +0,0 @@
-- +goose Up
-- +goose StatementBegin
ALTER TABLE account_accounts ADD COLUMN IF NOT EXISTS is_deactivated BOOLEAN DEFAULT FALSE;
-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
ALTER TABLE account_accounts DROP COLUMN is_deactivated;
-- +goose StatementEnd

View file

@ -25,6 +25,8 @@ import (
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/accounts/postgres/deltas"
_ "github.com/matrix-org/dendrite/userapi/storage/accounts/postgres/deltas"
"github.com/matrix-org/gomatrixserverlib"
"golang.org/x/crypto/bcrypt"
@ -55,6 +57,18 @@ func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserver
db: db,
writer: sqlutil.NewDummyWriter(),
}
// Create tables before executing migrations so we don't fail if the table is missing,
// and THEN prepare statements so we don't fail due to referencing new columns
if err = d.accounts.execSchema(db); err != nil {
return nil, err
}
m := sqlutil.NewMigrations()
deltas.LoadIsActive(m)
if err = m.RunDeltas(db, dbProperties); err != nil {
return nil, err
}
if err = d.PartitionOffsetStatements.Prepare(db, d.writer, "account"); err != nil {
return nil, err
}
@ -70,6 +84,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserver
if err = d.threepids.prepare(db); err != nil {
return nil, err
}
return d, nil
}

View file

@ -74,13 +74,13 @@ type accountsStatements struct {
serverName gomatrixserverlib.ServerName
}
func (s *accountsStatements) execSchema(db *sql.DB) error {
_, err := db.Exec(accountsSchema)
return err
}
func (s *accountsStatements) prepare(db *sql.DB, server gomatrixserverlib.ServerName) (err error) {
s.db = db
_, err = db.Exec(accountsSchema)
if err != nil {
return
}
if s.insertAccountStmt, err = db.Prepare(insertAccountSQL); err != nil {
return
}

View file

@ -0,0 +1,64 @@
package deltas
import (
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/pressly/goose"
)
func LoadFromGoose() {
goose.AddMigration(UpIsActive, DownIsActive)
}
func LoadIsActive(m *sqlutil.Migrations) {
m.AddMigration(UpIsActive, DownIsActive)
}
func UpIsActive(tx *sql.Tx) error {
_, err := tx.Exec(`
ALTER TABLE account_accounts RENAME TO account_accounts_tmp;
CREATE TABLE account_accounts (
localpart TEXT NOT NULL PRIMARY KEY,
created_ts BIGINT NOT NULL,
password_hash TEXT,
appservice_id TEXT,
is_deactivated BOOLEAN DEFAULT 0
);
INSERT
INTO account_accounts (
localpart, created_ts, password_hash, appservice_id
) SELECT
localpart, created_ts, password_hash, appservice_id
FROM account_accounts_tmp
;
DROP TABLE account_accounts_tmp;`)
if err != nil {
return fmt.Errorf("failed to execute upgrade: %w", err)
}
return nil
}
func DownIsActive(tx *sql.Tx) error {
_, err := tx.Exec(`
ALTER TABLE account_accounts RENAME TO account_accounts_tmp;
CREATE TABLE account_accounts (
localpart TEXT NOT NULL PRIMARY KEY,
created_ts BIGINT NOT NULL,
password_hash TEXT,
appservice_id TEXT
);
INSERT
INTO account_accounts (
localpart, created_ts, password_hash, appservice_id
) SELECT
localpart, created_ts, password_hash, appservice_id
FROM account_accounts_tmp
;
DROP TABLE account_accounts_tmp;`)
if err != nil {
return fmt.Errorf("failed to execute downgrade: %w", err)
}
return nil
}

View file

@ -1,38 +0,0 @@
-- +goose Up
-- +goose StatementBegin
ALTER TABLE account_accounts RENAME TO account_accounts_tmp;
CREATE TABLE account_accounts (
localpart TEXT NOT NULL PRIMARY KEY,
created_ts BIGINT NOT NULL,
password_hash TEXT,
appservice_id TEXT,
is_deactivated BOOLEAN DEFAULT 0
);
INSERT
INTO account_accounts (
localpart, created_ts, password_hash, appservice_id
) SELECT
localpart, created_ts, password_hash, appservice_id
FROM account_accounts_tmp
;
DROP TABLE account_accounts_tmp;
-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
ALTER TABLE account_accounts RENAME TO account_accounts_tmp;
CREATE TABLE account_accounts (
localpart TEXT NOT NULL PRIMARY KEY,
created_ts BIGINT NOT NULL,
password_hash TEXT,
appservice_id TEXT
);
INSERT
INTO account_accounts (
localpart, created_ts, password_hash, appservice_id
) SELECT
localpart, created_ts, password_hash, appservice_id
FROM account_accounts_tmp
;
DROP TABLE account_accounts_tmp;
-- +goose StatementEnd

View file

@ -26,6 +26,7 @@ import (
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/accounts/sqlite3/deltas"
"github.com/matrix-org/gomatrixserverlib"
"golang.org/x/crypto/bcrypt"
// Import the sqlite3 database driver.
@ -60,6 +61,18 @@ func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserver
db: db,
writer: sqlutil.NewExclusiveWriter(),
}
// Create tables before executing migrations so we don't fail if the table is missing,
// and THEN prepare statements so we don't fail due to referencing new columns
if err = d.accounts.execSchema(db); err != nil {
return nil, err
}
m := sqlutil.NewMigrations()
deltas.LoadIsActive(m)
if err = m.RunDeltas(db, dbProperties); err != nil {
return nil, err
}
partitions := sqlutil.PartitionOffsetStatements{}
if err = partitions.Prepare(db, d.writer, "account"); err != nil {
return nil, err
@ -76,6 +89,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserver
if err = d.threepids.prepare(db); err != nil {
return nil, err
}
return d, nil
}

View file

@ -0,0 +1,39 @@
package deltas
import (
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/pressly/goose"
)
func LoadFromGoose() {
goose.AddMigration(UpLastSeenTSIP, DownLastSeenTSIP)
}
func LoadLastSeenTSIP(m *sqlutil.Migrations) {
m.AddMigration(UpLastSeenTSIP, DownLastSeenTSIP)
}
func UpLastSeenTSIP(tx *sql.Tx) error {
_, err := tx.Exec(`
ALTER TABLE device_devices ADD COLUMN IF NOT EXISTS last_seen_ts BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)*1000;
ALTER TABLE device_devices ADD COLUMN IF NOT EXISTS ip TEXT;
ALTER TABLE device_devices ADD COLUMN IF NOT EXISTS user_agent TEXT;`)
if err != nil {
return fmt.Errorf("failed to execute upgrade: %w", err)
}
return nil
}
func DownLastSeenTSIP(tx *sql.Tx) error {
_, err := tx.Exec(`
ALTER TABLE device_devices DROP COLUMN last_seen_ts;
ALTER TABLE device_devices DROP COLUMN ip;
ALTER TABLE device_devices DROP COLUMN user_agent;`)
if err != nil {
return fmt.Errorf("failed to execute downgrade: %w", err)
}
return nil
}

View file

@ -1,13 +0,0 @@
-- +goose Up
-- +goose StatementBegin
ALTER TABLE device_devices ADD COLUMN IF NOT EXISTS last_seen_ts BIGINT NOT NULL DEFAULT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)*1000;
ALTER TABLE device_devices ADD COLUMN IF NOT EXISTS ip TEXT;
ALTER TABLE device_devices ADD COLUMN IF NOT EXISTS user_agent TEXT;
-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
ALTER TABLE device_devices DROP COLUMN last_seen_ts;
ALTER TABLE device_devices DROP COLUMN ip;
ALTER TABLE device_devices DROP COLUMN user_agent;
-- +goose StatementEnd

View file

@ -111,11 +111,12 @@ type devicesStatements struct {
serverName gomatrixserverlib.ServerName
}
func (s *devicesStatements) execSchema(db *sql.DB) error {
_, err := db.Exec(devicesSchema)
return err
}
func (s *devicesStatements) prepare(db *sql.DB, server gomatrixserverlib.ServerName) (err error) {
_, err = db.Exec(devicesSchema)
if err != nil {
return
}
if s.insertDeviceStmt, err = db.Prepare(insertDeviceSQL); err != nil {
return
}

View file

@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/devices/postgres/deltas"
"github.com/matrix-org/gomatrixserverlib"
)
@ -42,9 +43,22 @@ func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserver
return nil, err
}
d := devicesStatements{}
// Create tables before executing migrations so we don't fail if the table is missing,
// and THEN prepare statements so we don't fail due to referencing new columns
if err = d.execSchema(db); err != nil {
return nil, err
}
m := sqlutil.NewMigrations()
deltas.LoadLastSeenTSIP(m)
if err = m.RunDeltas(db, dbProperties); err != nil {
return nil, err
}
if err = d.prepare(db, serverName); err != nil {
return nil, err
}
return &Database{db, d}, nil
}

View file

@ -0,0 +1,70 @@
package deltas
import (
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/pressly/goose"
)
func LoadFromGoose() {
goose.AddMigration(UpLastSeenTSIP, DownLastSeenTSIP)
}
func LoadLastSeenTSIP(m *sqlutil.Migrations) {
m.AddMigration(UpLastSeenTSIP, DownLastSeenTSIP)
}
func UpLastSeenTSIP(tx *sql.Tx) error {
_, err := tx.Exec(`
ALTER TABLE device_devices RENAME TO device_devices_tmp;
CREATE TABLE device_devices (
access_token TEXT PRIMARY KEY,
session_id INTEGER,
device_id TEXT ,
localpart TEXT ,
created_ts BIGINT,
display_name TEXT,
last_seen_ts BIGINT,
ip TEXT,
user_agent TEXT,
UNIQUE (localpart, device_id)
);
INSERT
INTO device_devices (
access_token, session_id, device_id, localpart, created_ts, display_name, last_seen_ts, ip, user_agent
) SELECT
access_token, session_id, device_id, localpart, created_ts, display_name, created_ts, '', ''
FROM device_devices_tmp;
DROP TABLE device_devices_tmp;`)
if err != nil {
return fmt.Errorf("failed to execute upgrade: %w", err)
}
return nil
}
func DownLastSeenTSIP(tx *sql.Tx) error {
_, err := tx.Exec(`
ALTER TABLE device_devices RENAME TO device_devices_tmp;
CREATE TABLE IF NOT EXISTS device_devices (
access_token TEXT PRIMARY KEY,
session_id INTEGER,
device_id TEXT ,
localpart TEXT ,
created_ts BIGINT,
display_name TEXT,
UNIQUE (localpart, device_id)
);
INSERT
INTO device_devices (
access_token, session_id, device_id, localpart, created_ts, display_name
) SELECT
access_token, session_id, device_id, localpart, created_ts, display_name
FROM device_devices_tmp;
DROP TABLE device_devices_tmp;`)
if err != nil {
return fmt.Errorf("failed to execute downgrade: %w", err)
}
return nil
}

View file

@ -1,44 +0,0 @@
-- +goose Up
-- +goose StatementBegin
ALTER TABLE device_devices RENAME TO device_devices_tmp;
CREATE TABLE device_devices (
access_token TEXT PRIMARY KEY,
session_id INTEGER,
device_id TEXT ,
localpart TEXT ,
created_ts BIGINT,
display_name TEXT,
last_seen_ts BIGINT,
ip TEXT,
user_agent TEXT,
UNIQUE (localpart, device_id)
);
INSERT
INTO device_devices (
access_token, session_id, device_id, localpart, created_ts, display_name, last_seen_ts, ip, user_agent
) SELECT
access_token, session_id, device_id, localpart, created_ts, display_name, created_ts, '', ''
FROM device_devices_tmp;
DROP TABLE device_devices_tmp;
-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
ALTER TABLE device_devices RENAME TO device_devices_tmp;
CREATE TABLE IF NOT EXISTS device_devices (
access_token TEXT PRIMARY KEY,
session_id INTEGER,
device_id TEXT ,
localpart TEXT ,
created_ts BIGINT,
display_name TEXT,
UNIQUE (localpart, device_id)
);
INSERT
INTO device_devices (
access_token, session_id, device_id, localpart, created_ts, display_name
) SELECT
access_token, session_id, device_id, localpart, created_ts, display_name
FROM device_devices_tmp;
DROP TABLE device_devices_tmp;
-- +goose StatementEnd

View file

@ -98,13 +98,14 @@ type devicesStatements struct {
serverName gomatrixserverlib.ServerName
}
func (s *devicesStatements) execSchema(db *sql.DB) error {
_, err := db.Exec(devicesSchema)
return err
}
func (s *devicesStatements) prepare(db *sql.DB, writer sqlutil.Writer, server gomatrixserverlib.ServerName) (err error) {
s.db = db
s.writer = writer
_, err = db.Exec(devicesSchema)
if err != nil {
return
}
if s.insertDeviceStmt, err = db.Prepare(insertDeviceSQL); err != nil {
return
}

View file

@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/devices/sqlite3/deltas"
"github.com/matrix-org/gomatrixserverlib"
_ "github.com/mattn/go-sqlite3"
@ -46,6 +47,17 @@ func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserver
}
writer := sqlutil.NewExclusiveWriter()
d := devicesStatements{}
// Create tables before executing migrations so we don't fail if the table is missing,
// and THEN prepare statements so we don't fail due to referencing new columns
if err = d.execSchema(db); err != nil {
return nil, err
}
m := sqlutil.NewMigrations()
deltas.LoadLastSeenTSIP(m)
if err = m.RunDeltas(db, dbProperties); err != nil {
return nil, err
}
if err = d.prepare(db, writer, serverName); err != nil {
return nil, err
}