Use HTTP API for roomserver input.

This commit is contained in:
Mark Haines 2017-07-13 12:21:12 +01:00
parent d3a29b7816
commit 926600c1d0
3 changed files with 14 additions and 58 deletions

View file

@ -15,35 +15,24 @@
package producers package producers
import ( import (
"encoding/json"
"fmt"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
sarama "gopkg.in/Shopify/sarama.v1"
) )
// RoomserverProducer produces events for the roomserver to consume. // RoomserverProducer produces events for the roomserver to consume.
type RoomserverProducer struct { type RoomserverProducer struct {
Topic string InputAPI api.RoomserverInputAPI
Producer sarama.SyncProducer
} }
// NewRoomserverProducer creates a new RoomserverProducer // NewRoomserverProducer creates a new RoomserverProducer
func NewRoomserverProducer(kafkaURIs []string, topic string) (*RoomserverProducer, error) { func NewRoomserverProducer(roomserverURI string) *RoomserverProducer {
producer, err := sarama.NewSyncProducer(kafkaURIs, nil)
if err != nil {
return nil, err
}
return &RoomserverProducer{ return &RoomserverProducer{
Topic: topic, InputAPI: api.NewRoomserverInputAPIHTTP(roomserverURI, nil),
Producer: producer, }
}, nil
} }
// SendEvents writes the given events to the roomserver input log. The events are written with KindNew. // SendEvents writes the given events to the roomserver input log. The events are written with KindNew.
func (c *RoomserverProducer) SendEvents(events []gomatrixserverlib.Event, sendAsServer gomatrixserverlib.ServerName) error { func (c *RoomserverProducer) SendEvents(events []gomatrixserverlib.Event, sendAsServer gomatrixserverlib.ServerName) error {
eventIDs := make([]string, len(events))
ires := make([]api.InputRoomEvent, len(events)) ires := make([]api.InputRoomEvent, len(events))
for i, event := range events { for i, event := range events {
ires[i] = api.InputRoomEvent{ ires[i] = api.InputRoomEvent{
@ -52,9 +41,8 @@ func (c *RoomserverProducer) SendEvents(events []gomatrixserverlib.Event, sendAs
AuthEventIDs: event.AuthEventIDs(), AuthEventIDs: event.AuthEventIDs(),
SendAsServer: string(sendAsServer), SendAsServer: string(sendAsServer),
} }
eventIDs[i] = event.EventID()
} }
return c.SendInputRoomEvents(ires, eventIDs) return c.SendInputRoomEvents(ires)
} }
// SendEventWithState writes an event with KindNew to the roomserver input log // SendEventWithState writes an event with KindNew to the roomserver input log
@ -65,7 +53,6 @@ func (c *RoomserverProducer) SendEventWithState(state gomatrixserverlib.RespStat
return err return err
} }
eventIDs := make([]string, len(outliers)+1)
ires := make([]api.InputRoomEvent, len(outliers)+1) ires := make([]api.InputRoomEvent, len(outliers)+1)
for i, outlier := range outliers { for i, outlier := range outliers {
ires[i] = api.InputRoomEvent{ ires[i] = api.InputRoomEvent{
@ -73,7 +60,6 @@ func (c *RoomserverProducer) SendEventWithState(state gomatrixserverlib.RespStat
Event: outlier, Event: outlier,
AuthEventIDs: outlier.AuthEventIDs(), AuthEventIDs: outlier.AuthEventIDs(),
} }
eventIDs[i] = outlier.EventID()
} }
stateEventIDs := make([]string, len(state.StateEvents)) stateEventIDs := make([]string, len(state.StateEvents))
@ -88,41 +74,14 @@ func (c *RoomserverProducer) SendEventWithState(state gomatrixserverlib.RespStat
HasState: true, HasState: true,
StateEventIDs: stateEventIDs, StateEventIDs: stateEventIDs,
} }
eventIDs[len(outliers)] = event.EventID()
return c.SendInputRoomEvents(ires, eventIDs) return c.SendInputRoomEvents(ires)
} }
// SendInputRoomEvents writes the given input room events to the roomserver input log. The length of both // SendInputRoomEvents writes the given input room events to the roomserver input log. The length of both
// arrays must match, and each element must correspond to the same event. // arrays must match, and each element must correspond to the same event.
func (c *RoomserverProducer) SendInputRoomEvents(ires []api.InputRoomEvent, eventIDs []string) error { func (c *RoomserverProducer) SendInputRoomEvents(ires []api.InputRoomEvent) error {
// TODO: Nicer way of doing this. Options are: request := api.InputRoomEventsRequest{InputRoomEvents: ires}
// A) Like this var response api.InputRoomEventsResponse
// B) Add EventID field to InputRoomEvent return c.InputAPI.InputRoomEvents(&request, &response)
// C) Add wrapper struct with the EventID and the InputRoomEvent
if len(eventIDs) != len(ires) {
return fmt.Errorf("WriteInputRoomEvents: length mismatch %d != %d", len(eventIDs), len(ires))
}
msgs := make([]*sarama.ProducerMessage, len(ires))
for i := range ires {
msg, err := c.toProducerMessage(ires[i], eventIDs[i])
if err != nil {
return err
}
msgs[i] = msg
}
return c.Producer.SendMessages(msgs)
}
func (c *RoomserverProducer) toProducerMessage(ire api.InputRoomEvent, eventID string) (*sarama.ProducerMessage, error) {
value, err := json.Marshal(ire)
if err != nil {
return nil, err
}
var m sarama.ProducerMessage
m.Topic = c.Topic
m.Key = sarama.StringEncoder(eventID)
m.Value = sarama.ByteEncoder(value)
return &m, nil
} }

View file

@ -50,9 +50,9 @@ func main() {
log.Info("config: ", cfg) log.Info("config: ", cfg)
roomserverProducer, err := producers.NewRoomserverProducer( queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil)
cfg.Kafka.Addresses, string(cfg.Kafka.Topics.InputRoomEvent),
) roomserverProducer := producers.NewRoomserverProducer(cfg.RoomServerURL())
userUpdateProducer, err := producers.NewUserUpdateProducer( userUpdateProducer, err := producers.NewUserUpdateProducer(
cfg.Kafka.Addresses, string(cfg.Kafka.Topics.UserUpdates), cfg.Kafka.Addresses, string(cfg.Kafka.Topics.UserUpdates),
) )
@ -64,7 +64,6 @@ func main() {
cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey, cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey,
) )
queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil)
accountDB, err := accounts.NewDatabase(string(cfg.Database.Account), cfg.Matrix.ServerName) accountDB, err := accounts.NewDatabase(string(cfg.Database.Account), cfg.Matrix.ServerName)
if err != nil { if err != nil {
log.Panicf("Failed to setup account database(%q): %s", cfg.Database.Account, err.Error()) log.Panicf("Failed to setup account database(%q): %s", cfg.Database.Account, err.Error())

View file

@ -67,9 +67,7 @@ func main() {
queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil) queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil)
roomserverProducer, err := producers.NewRoomserverProducer( roomserverProducer := producers.NewRoomserverProducer(cfg.RoomServerURL())
cfg.Kafka.Addresses, string(cfg.Kafka.Topics.InputRoomEvent),
)
if err != nil { if err != nil {
log.Panicf("Failed to setup kafka producers(%s): %s", cfg.Kafka.Addresses, err) log.Panicf("Failed to setup kafka producers(%s): %s", cfg.Kafka.Addresses, err)