From 115b1e57a7d63736e37246a2e801b71441fc8c08 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 16 Aug 2017 14:01:08 +0100 Subject: [PATCH] Add roomserver consumer --- .../dendrite/clientapi/routing/routing.go | 7 ++ .../dendrite-public-rooms-api-server/main.go | 9 ++ .../publicroomsapi/consumers/roomserver.go | 88 +++++++++++++++++++ .../publicroomsapi/storage/storage.go | 10 +++ 4 files changed, 114 insertions(+) create mode 100644 src/github.com/matrix-org/dendrite/publicroomsapi/consumers/roomserver.go diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go index 68a9de075..f286b9b65 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go @@ -100,6 +100,13 @@ func Setup( return writers.SendEvent(req, device, vars["roomID"], vars["eventType"], vars["txnID"], &emptyString, cfg, queryAPI, producer) }), ) + r0mux.Handle("/rooms/{roomID}/state/{eventType}/", + common.MakeAuthAPI("send_message", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse { + vars := mux.Vars(req) + emptyString := "" + return writers.SendEvent(req, device, vars["roomID"], vars["eventType"], vars["txnID"], &emptyString, cfg, queryAPI, producer) + }), + ) r0mux.Handle("/rooms/{roomID}/state/{eventType}/{stateKey}", common.MakeAuthAPI("send_message", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse { vars := mux.Vars(req) diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-public-rooms-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-public-rooms-api-server/main.go index f2c35d6a6..f009c1be2 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-public-rooms-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-public-rooms-api-server/main.go @@ -22,6 +22,7 @@ import ( "github.com/gorilla/mux" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/publicroomsapi/consumers" "github.com/matrix-org/dendrite/publicroomsapi/routing" "github.com/matrix-org/dendrite/publicroomsapi/storage" @@ -48,6 +49,14 @@ func main() { log.Panicf("startup: failed to create public rooms server database with data source %s : %s", cfg.Database.PublicRoomsAPI, err) } + roomConsumer, err := consumers.NewOutputRoomEvent(cfg, db) + if err != nil { + log.Panicf("startup: failed to create room server consumer: %s", err) + } + if err = roomConsumer.Start(); err != nil { + log.Panicf("startup: failed to start room server consumer: %s", err) + } + log.Info("Starting public rooms server on ", cfg.Listen.PublicRoomsAPI) api := mux.NewRouter() diff --git a/src/github.com/matrix-org/dendrite/publicroomsapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/publicroomsapi/consumers/roomserver.go new file mode 100644 index 000000000..f4a9793f4 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/publicroomsapi/consumers/roomserver.go @@ -0,0 +1,88 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "encoding/json" + + log "github.com/Sirupsen/logrus" + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/publicroomsapi/storage" + "github.com/matrix-org/dendrite/roomserver/api" + sarama "gopkg.in/Shopify/sarama.v1" +) + +// OutputRoomEvent consumes events that originated in the room server. +type OutputRoomEvent struct { + roomServerConsumer *common.ContinualConsumer + db *storage.PublicRoomsServerDatabase + query api.RoomserverQueryAPI +} + +// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers. +func NewOutputRoomEvent(cfg *config.Dendrite, store *storage.PublicRoomsServerDatabase) (*OutputRoomEvent, error) { + kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) + if err != nil { + return nil, err + } + roomServerURL := cfg.RoomServerURL() + + consumer := common.ContinualConsumer{ + Topic: string(cfg.Kafka.Topics.OutputRoomEvent), + Consumer: kafkaConsumer, + PartitionStore: store, + } + s := &OutputRoomEvent{ + roomServerConsumer: &consumer, + db: store, + query: api.NewRoomserverQueryAPIHTTP(roomServerURL, nil), + } + consumer.ProcessMessage = s.onMessage + + return s, nil +} + +// Start consuming from room servers +func (s *OutputRoomEvent) Start() error { + return s.roomServerConsumer.Start() +} + +// onMessage is called when the sync server receives a new event from the room server output log. +func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { + // Parse out the event JSON + var output api.OutputEvent + if err := json.Unmarshal(msg.Value, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("roomserver output log: message parse failure") + return nil + } + + if output.Type != api.OutputTypeNewRoomEvent { + log.WithField("type", output.Type).Debug( + "roomserver output log: ignoring unknown output type", + ) + return nil + } + + ev := output.NewRoomEvent.Event + log.WithFields(log.Fields{ + "event_id": ev.EventID(), + "room_id": ev.RoomID(), + "type": ev.Type(), + }).Info("received event from roomserver") + + return s.db.UpdateRoomFromEvent(ev) +} diff --git a/src/github.com/matrix-org/dendrite/publicroomsapi/storage/storage.go b/src/github.com/matrix-org/dendrite/publicroomsapi/storage/storage.go index d693bc6b8..a93a49836 100644 --- a/src/github.com/matrix-org/dendrite/publicroomsapi/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/publicroomsapi/storage/storage.go @@ -51,6 +51,16 @@ func NewPublicRoomsServerDatabase(dataSourceName string) (*PublicRoomsServerData return &PublicRoomsServerDatabase{db, partitions, statements}, nil } +// PartitionOffsets implements common.PartitionStorer +func (d *PublicRoomsServerDatabase) PartitionOffsets(topic string) ([]common.PartitionOffset, error) { + return d.partitions.SelectPartitionOffsets(topic) +} + +// SetPartitionOffset implements common.PartitionStorer +func (d *PublicRoomsServerDatabase) SetPartitionOffset(topic string, partition int32, offset int64) error { + return d.partitions.UpsertPartitionOffset(topic, partition, offset) +} + // GetRoomVisibility returns the room visibility as a boolean: true if the room // is publicly visible, false if not. // Returns an error if the retrieval failed.