Make roomserver use the factored out partition offsets table

This commit is contained in:
Kegan Dougal 2017-03-24 11:24:08 +00:00
parent 6094195b68
commit f04769efb3
5 changed files with 32 additions and 33 deletions

View file

@ -1,9 +1,6 @@
package storage package common
import ( import "database/sql"
"database/sql"
"github.com/matrix-org/dendrite/roomserver/types"
)
const partitionOffsetsSchema = ` const partitionOffsetsSchema = `
-- The offsets that the server has processed up to. -- The offsets that the server has processed up to.
@ -26,32 +23,37 @@ const upsertPartitionOffsetsSQL = "" +
" ON CONFLICT ON CONSTRAINT topic_partition_unique" + " ON CONFLICT ON CONSTRAINT topic_partition_unique" +
" DO UPDATE SET partition_offset = $3" " DO UPDATE SET partition_offset = $3"
type partitionOffsetStatements struct { // PartitionOffsetStatements represents a set of statements that can be run on a partition_offsets table.
type PartitionOffsetStatements struct {
selectPartitionOffsetsStmt *sql.Stmt selectPartitionOffsetsStmt *sql.Stmt
upsertPartitionOffsetStmt *sql.Stmt upsertPartitionOffsetStmt *sql.Stmt
} }
func (s *partitionOffsetStatements) prepare(db *sql.DB) (err error) { // Prepare converts the raw SQL statements into prepared statements.
func (s *PartitionOffsetStatements) Prepare(db *sql.DB) (err error) {
_, err = db.Exec(partitionOffsetsSchema) _, err = db.Exec(partitionOffsetsSchema)
if err != nil { if err != nil {
return return
} }
if s.selectPartitionOffsetsStmt, err = db.Prepare(selectPartitionOffsetsSQL); err != nil {
return statementList{ return
{&s.selectPartitionOffsetsStmt, selectPartitionOffsetsSQL}, }
{&s.upsertPartitionOffsetStmt, upsertPartitionOffsetsSQL}, if s.upsertPartitionOffsetStmt, err = db.Prepare(upsertPartitionOffsetsSQL); err != nil {
}.prepare(db) return
}
return
} }
func (s *partitionOffsetStatements) selectPartitionOffsets(topic string) ([]types.PartitionOffset, error) { // SelectPartitionOffsets returns all the partition offsets for the given topic.
func (s *PartitionOffsetStatements) SelectPartitionOffsets(topic string) ([]PartitionOffset, error) {
rows, err := s.selectPartitionOffsetsStmt.Query(topic) rows, err := s.selectPartitionOffsetsStmt.Query(topic)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer rows.Close() defer rows.Close()
var results []types.PartitionOffset var results []PartitionOffset
for rows.Next() { for rows.Next() {
var offset types.PartitionOffset var offset PartitionOffset
if err := rows.Scan(&offset.Partition, &offset.Offset); err != nil { if err := rows.Scan(&offset.Partition, &offset.Offset); err != nil {
return nil, err return nil, err
} }
@ -59,7 +61,8 @@ func (s *partitionOffsetStatements) selectPartitionOffsets(topic string) ([]type
return results, nil return results, nil
} }
func (s *partitionOffsetStatements) upsertPartitionOffset(topic string, partition int32, offset int64) error { // UpsertPartitionOffset updates or inserts the partition offset for the given topic.
func (s *PartitionOffsetStatements) UpsertPartitionOffset(topic string, partition int32, offset int64) error {
_, err := s.upsertPartitionOffsetStmt.Exec(topic, partition, offset) _, err := s.upsertPartitionOffsetStmt.Exec(topic, partition, offset)
return err return err
} }

View file

@ -4,17 +4,18 @@ package input
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/types"
sarama "gopkg.in/Shopify/sarama.v1"
"sync/atomic" "sync/atomic"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/roomserver/api"
sarama "gopkg.in/Shopify/sarama.v1"
) )
// A ConsumerDatabase has the storage APIs needed by the consumer. // A ConsumerDatabase has the storage APIs needed by the consumer.
type ConsumerDatabase interface { type ConsumerDatabase interface {
RoomEventDatabase RoomEventDatabase
// PartitionOffsets returns the offsets the consumer has reached for each partition. // PartitionOffsets returns the offsets the consumer has reached for each partition.
PartitionOffsets(topic string) ([]types.PartitionOffset, error) PartitionOffsets(topic string) ([]common.PartitionOffset, error)
// SetPartitionOffset records where the consumer has reached for a partition. // SetPartitionOffset records where the consumer has reached for a partition.
SetPartitionOffset(topic string, partition int32, offset int64) error SetPartitionOffset(topic string, partition int32, offset int64) error
} }

View file

@ -2,10 +2,12 @@ package storage
import ( import (
"database/sql" "database/sql"
"github.com/matrix-org/dendrite/common"
) )
type statements struct { type statements struct {
partitionOffsetStatements common.PartitionOffsetStatements
eventTypeStatements eventTypeStatements
eventStateKeyStatements eventStateKeyStatements
roomStatements roomStatements
@ -19,7 +21,7 @@ type statements struct {
func (s *statements) prepare(db *sql.DB) error { func (s *statements) prepare(db *sql.DB) error {
var err error var err error
if err = s.partitionOffsetStatements.prepare(db); err != nil { if err = s.PartitionOffsetStatements.Prepare(db); err != nil {
return err return err
} }

View file

@ -4,6 +4,7 @@ import (
"database/sql" "database/sql"
// Import the postgres database driver. // Import the postgres database driver.
_ "github.com/lib/pq" _ "github.com/lib/pq"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
@ -28,13 +29,13 @@ func Open(dataSourceName string) (*Database, error) {
} }
// PartitionOffsets implements input.ConsumerDatabase // PartitionOffsets implements input.ConsumerDatabase
func (d *Database) PartitionOffsets(topic string) ([]types.PartitionOffset, error) { func (d *Database) PartitionOffsets(topic string) ([]common.PartitionOffset, error) {
return d.statements.selectPartitionOffsets(topic) return d.statements.SelectPartitionOffsets(topic)
} }
// SetPartitionOffset implements input.ConsumerDatabase // SetPartitionOffset implements input.ConsumerDatabase
func (d *Database) SetPartitionOffset(topic string, partition int32, offset int64) error { func (d *Database) SetPartitionOffset(topic string, partition int32, offset int64) error {
return d.statements.upsertPartitionOffset(topic, partition, offset) return d.statements.UpsertPartitionOffset(topic, partition, offset)
} }
// StoreEvent implements input.EventDatabase // StoreEvent implements input.EventDatabase

View file

@ -5,14 +5,6 @@ import (
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
// A PartitionOffset is the offset into a partition of the input log.
type PartitionOffset struct {
// The ID of the partition.
Partition int32
// The offset into the partition.
Offset int64
}
// EventTypeNID is a numeric ID for an event type. // EventTypeNID is a numeric ID for an event type.
type EventTypeNID int64 type EventTypeNID int64