From 926600c1d0b0988fee186caa176a08a19180c755 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 13 Jul 2017 12:21:12 +0100 Subject: [PATCH] Use HTTP API for roomserver input. --- .../clientapi/producers/roomserver.go | 61 +++---------------- .../cmd/dendrite-client-api-server/main.go | 7 +-- .../dendrite-federation-api-server/main.go | 4 +- 3 files changed, 14 insertions(+), 58 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go b/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go index 3b46487a2..34455ddbc 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go @@ -15,35 +15,24 @@ package producers import ( - "encoding/json" - "fmt" - "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" - sarama "gopkg.in/Shopify/sarama.v1" ) // RoomserverProducer produces events for the roomserver to consume. type RoomserverProducer struct { - Topic string - Producer sarama.SyncProducer + InputAPI api.RoomserverInputAPI } // NewRoomserverProducer creates a new RoomserverProducer -func NewRoomserverProducer(kafkaURIs []string, topic string) (*RoomserverProducer, error) { - producer, err := sarama.NewSyncProducer(kafkaURIs, nil) - if err != nil { - return nil, err - } +func NewRoomserverProducer(roomserverURI string) *RoomserverProducer { return &RoomserverProducer{ - Topic: topic, - Producer: producer, - }, nil + InputAPI: api.NewRoomserverInputAPIHTTP(roomserverURI, nil), + } } // SendEvents writes the given events to the roomserver input log. The events are written with KindNew. func (c *RoomserverProducer) SendEvents(events []gomatrixserverlib.Event, sendAsServer gomatrixserverlib.ServerName) error { - eventIDs := make([]string, len(events)) ires := make([]api.InputRoomEvent, len(events)) for i, event := range events { ires[i] = api.InputRoomEvent{ @@ -52,9 +41,8 @@ func (c *RoomserverProducer) SendEvents(events []gomatrixserverlib.Event, sendAs AuthEventIDs: event.AuthEventIDs(), SendAsServer: string(sendAsServer), } - eventIDs[i] = event.EventID() } - return c.SendInputRoomEvents(ires, eventIDs) + return c.SendInputRoomEvents(ires) } // SendEventWithState writes an event with KindNew to the roomserver input log @@ -65,7 +53,6 @@ func (c *RoomserverProducer) SendEventWithState(state gomatrixserverlib.RespStat return err } - eventIDs := make([]string, len(outliers)+1) ires := make([]api.InputRoomEvent, len(outliers)+1) for i, outlier := range outliers { ires[i] = api.InputRoomEvent{ @@ -73,7 +60,6 @@ func (c *RoomserverProducer) SendEventWithState(state gomatrixserverlib.RespStat Event: outlier, AuthEventIDs: outlier.AuthEventIDs(), } - eventIDs[i] = outlier.EventID() } stateEventIDs := make([]string, len(state.StateEvents)) @@ -88,41 +74,14 @@ func (c *RoomserverProducer) SendEventWithState(state gomatrixserverlib.RespStat HasState: true, StateEventIDs: stateEventIDs, } - eventIDs[len(outliers)] = event.EventID() - return c.SendInputRoomEvents(ires, eventIDs) + return c.SendInputRoomEvents(ires) } // SendInputRoomEvents writes the given input room events to the roomserver input log. The length of both // arrays must match, and each element must correspond to the same event. -func (c *RoomserverProducer) SendInputRoomEvents(ires []api.InputRoomEvent, eventIDs []string) error { - // TODO: Nicer way of doing this. Options are: - // A) Like this - // B) Add EventID field to InputRoomEvent - // C) Add wrapper struct with the EventID and the InputRoomEvent - if len(eventIDs) != len(ires) { - return fmt.Errorf("WriteInputRoomEvents: length mismatch %d != %d", len(eventIDs), len(ires)) - } - - msgs := make([]*sarama.ProducerMessage, len(ires)) - for i := range ires { - msg, err := c.toProducerMessage(ires[i], eventIDs[i]) - if err != nil { - return err - } - msgs[i] = msg - } - return c.Producer.SendMessages(msgs) -} - -func (c *RoomserverProducer) toProducerMessage(ire api.InputRoomEvent, eventID string) (*sarama.ProducerMessage, error) { - value, err := json.Marshal(ire) - if err != nil { - return nil, err - } - var m sarama.ProducerMessage - m.Topic = c.Topic - m.Key = sarama.StringEncoder(eventID) - m.Value = sarama.ByteEncoder(value) - return &m, nil +func (c *RoomserverProducer) SendInputRoomEvents(ires []api.InputRoomEvent) error { + request := api.InputRoomEventsRequest{InputRoomEvents: ires} + var response api.InputRoomEventsResponse + return c.InputAPI.InputRoomEvents(&request, &response) } diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go index a64cc9a07..9c99facc7 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go @@ -50,9 +50,9 @@ func main() { log.Info("config: ", cfg) - roomserverProducer, err := producers.NewRoomserverProducer( - cfg.Kafka.Addresses, string(cfg.Kafka.Topics.InputRoomEvent), - ) + queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil) + + roomserverProducer := producers.NewRoomserverProducer(cfg.RoomServerURL()) userUpdateProducer, err := producers.NewUserUpdateProducer( cfg.Kafka.Addresses, string(cfg.Kafka.Topics.UserUpdates), ) @@ -64,7 +64,6 @@ func main() { cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey, ) - queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil) accountDB, err := accounts.NewDatabase(string(cfg.Database.Account), cfg.Matrix.ServerName) if err != nil { log.Panicf("Failed to setup account database(%q): %s", cfg.Database.Account, err.Error()) diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-api-server/main.go index f4f19cdcd..a479ad554 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-api-server/main.go @@ -67,9 +67,7 @@ func main() { queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil) - roomserverProducer, err := producers.NewRoomserverProducer( - cfg.Kafka.Addresses, string(cfg.Kafka.Topics.InputRoomEvent), - ) + roomserverProducer := producers.NewRoomserverProducer(cfg.RoomServerURL()) if err != nil { log.Panicf("Failed to setup kafka producers(%s): %s", cfg.Kafka.Addresses, err)