Remove unused config for kafka topic

This commit is contained in:
Mark Haines 2017-07-17 14:15:07 +01:00
parent 5a536aaf18
commit 4eec8da6b5
4 changed files with 8 additions and 28 deletions

View file

@ -19,13 +19,14 @@ import (
"crypto/sha256" "crypto/sha256"
"encoding/pem" "encoding/pem"
"fmt" "fmt"
"github.com/matrix-org/gomatrixserverlib"
"golang.org/x/crypto/ed25519"
"gopkg.in/yaml.v2"
"io/ioutil" "io/ioutil"
"path/filepath" "path/filepath"
"strings" "strings"
"time" "time"
"github.com/matrix-org/gomatrixserverlib"
"golang.org/x/crypto/ed25519"
"gopkg.in/yaml.v2"
) )
// Version is the current version of the config format. // Version is the current version of the config format.
@ -95,8 +96,6 @@ type Dendrite struct {
Addresses []string `yaml:"addresses"` Addresses []string `yaml:"addresses"`
// The names of the topics to use when reading and writing from kafka. // The names of the topics to use when reading and writing from kafka.
Topics struct { Topics struct {
// Topic for roomserver/api.InputRoomEvent events.
InputRoomEvent Topic `yaml:"input_room_event"`
// Topic for roomserver/api.OutputRoomEvent events. // Topic for roomserver/api.OutputRoomEvent events.
OutputRoomEvent Topic `yaml:"output_room_event"` OutputRoomEvent Topic `yaml:"output_room_event"`
// Topic for user updates (profile, presence) // Topic for user updates (profile, presence)
@ -298,7 +297,6 @@ func (config *Dendrite) check() error {
} }
checkNotZero("kafka.addresses", int64(len(config.Kafka.Addresses))) checkNotZero("kafka.addresses", int64(len(config.Kafka.Addresses)))
checkNotEmpty("kafka.topics.input_room_event", string(config.Kafka.Topics.InputRoomEvent))
checkNotEmpty("kafka.topics.output_room_event", string(config.Kafka.Topics.OutputRoomEvent)) checkNotEmpty("kafka.topics.output_room_event", string(config.Kafka.Topics.OutputRoomEvent))
checkNotEmpty("database.account", string(config.Database.Account)) checkNotEmpty("database.account", string(config.Database.Account))
checkNotEmpty("database.device", string(config.Database.Device)) checkNotEmpty("database.device", string(config.Database.Device))

View file

@ -21,14 +21,15 @@ import (
"encoding/base64" "encoding/base64"
"encoding/pem" "encoding/pem"
"fmt" "fmt"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/gomatrixserverlib"
"gopkg.in/yaml.v2"
"io/ioutil" "io/ioutil"
"math/big" "math/big"
"os" "os"
"path/filepath" "path/filepath"
"time" "time"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/gomatrixserverlib"
"gopkg.in/yaml.v2"
) )
const ( const (
@ -80,7 +81,6 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con
cfg.Kafka.Addresses = []string{kafkaURI} cfg.Kafka.Addresses = []string{kafkaURI}
// TODO: Different servers should be using different topics. // TODO: Different servers should be using different topics.
// Make this configurable somehow? // Make this configurable somehow?
cfg.Kafka.Topics.InputRoomEvent = "test.room.input"
cfg.Kafka.Topics.OutputRoomEvent = "test.room.output" cfg.Kafka.Topics.OutputRoomEvent = "test.room.output"
// TODO: Use different databases for the different schemas. // TODO: Use different databases for the different schemas.

View file

@ -16,12 +16,9 @@ package storage
import ( import (
"database/sql" "database/sql"
"github.com/matrix-org/dendrite/common"
) )
type statements struct { type statements struct {
common.PartitionOffsetStatements
eventTypeStatements eventTypeStatements
eventStateKeyStatements eventStateKeyStatements
roomStatements roomStatements
@ -35,10 +32,6 @@ type statements struct {
func (s *statements) prepare(db *sql.DB) error { func (s *statements) prepare(db *sql.DB) error {
var err error var err error
if err = s.PartitionOffsetStatements.Prepare(db); err != nil {
return err
}
if err = s.eventTypeStatements.prepare(db); err != nil { if err = s.eventTypeStatements.prepare(db); err != nil {
return err return err
} }

View file

@ -18,7 +18,6 @@ import (
"database/sql" "database/sql"
// Import the postgres database driver. // Import the postgres database driver.
_ "github.com/lib/pq" _ "github.com/lib/pq"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
@ -42,16 +41,6 @@ func Open(dataSourceName string) (*Database, error) {
return &d, nil return &d, nil
} }
// PartitionOffsets implements input.ConsumerDatabase
func (d *Database) PartitionOffsets(topic string) ([]common.PartitionOffset, error) {
return d.statements.SelectPartitionOffsets(topic)
}
// SetPartitionOffset implements input.ConsumerDatabase
func (d *Database) SetPartitionOffset(topic string, partition int32, offset int64) error {
return d.statements.UpsertPartitionOffset(topic, partition, offset)
}
// StoreEvent implements input.EventDatabase // StoreEvent implements input.EventDatabase
func (d *Database) StoreEvent(event gomatrixserverlib.Event, authEventNIDs []types.EventNID) (types.RoomNID, types.StateAtEvent, error) { func (d *Database) StoreEvent(event gomatrixserverlib.Event, authEventNIDs []types.EventNID) (types.RoomNID, types.StateAtEvent, error) {
var ( var (