2017-04-20 17:40:52 -05:00
|
|
|
// Copyright 2017 Vector Creations Ltd
|
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
|
|
|
|
2017-03-15 08:36:26 -05:00
|
|
|
package producers
|
|
|
|
|
|
|
|
import (
|
|
|
|
"encoding/json"
|
|
|
|
"fmt"
|
|
|
|
|
|
|
|
"github.com/matrix-org/dendrite/roomserver/api"
|
|
|
|
"github.com/matrix-org/gomatrixserverlib"
|
|
|
|
sarama "gopkg.in/Shopify/sarama.v1"
|
|
|
|
)
|
|
|
|
|
|
|
|
// RoomserverProducer produces events for the roomserver to consume.
|
|
|
|
type RoomserverProducer struct {
|
|
|
|
Topic string
|
|
|
|
Producer sarama.SyncProducer
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewRoomserverProducer creates a new RoomserverProducer
|
|
|
|
func NewRoomserverProducer(kafkaURIs []string, topic string) (*RoomserverProducer, error) {
|
|
|
|
producer, err := sarama.NewSyncProducer(kafkaURIs, nil)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return &RoomserverProducer{
|
|
|
|
Topic: topic,
|
|
|
|
Producer: producer,
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// SendEvents writes the given events to the roomserver input log. The events are written with KindNew.
|
2017-06-27 09:28:44 -05:00
|
|
|
func (c *RoomserverProducer) SendEvents(events []gomatrixserverlib.Event, sendAsServer gomatrixserverlib.ServerName) error {
|
2017-03-15 08:36:26 -05:00
|
|
|
eventIDs := make([]string, len(events))
|
|
|
|
ires := make([]api.InputRoomEvent, len(events))
|
2017-05-25 10:08:28 -05:00
|
|
|
for i, event := range events {
|
|
|
|
ires[i] = api.InputRoomEvent{
|
2017-03-15 08:36:26 -05:00
|
|
|
Kind: api.KindNew,
|
2017-07-13 05:41:30 -05:00
|
|
|
Event: event,
|
2017-07-07 08:11:32 -05:00
|
|
|
AuthEventIDs: event.AuthEventIDs(),
|
2017-06-27 09:28:44 -05:00
|
|
|
SendAsServer: string(sendAsServer),
|
2017-03-15 08:36:26 -05:00
|
|
|
}
|
2017-05-25 10:08:28 -05:00
|
|
|
eventIDs[i] = event.EventID()
|
2017-03-15 08:36:26 -05:00
|
|
|
}
|
|
|
|
return c.SendInputRoomEvents(ires, eventIDs)
|
|
|
|
}
|
|
|
|
|
2017-05-25 10:08:28 -05:00
|
|
|
// SendEventWithState writes an event with KindNew to the roomserver input log
|
|
|
|
// with the state at the event as KindOutlier before it.
|
|
|
|
func (c *RoomserverProducer) SendEventWithState(state gomatrixserverlib.RespState, event gomatrixserverlib.Event) error {
|
|
|
|
outliers, err := state.Events()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
eventIDs := make([]string, len(outliers)+1)
|
|
|
|
ires := make([]api.InputRoomEvent, len(outliers)+1)
|
|
|
|
for i, outlier := range outliers {
|
|
|
|
ires[i] = api.InputRoomEvent{
|
|
|
|
Kind: api.KindOutlier,
|
2017-07-13 05:41:30 -05:00
|
|
|
Event: outlier,
|
2017-07-07 08:11:32 -05:00
|
|
|
AuthEventIDs: outlier.AuthEventIDs(),
|
2017-05-25 10:08:28 -05:00
|
|
|
}
|
|
|
|
eventIDs[i] = outlier.EventID()
|
|
|
|
}
|
|
|
|
|
|
|
|
stateEventIDs := make([]string, len(state.StateEvents))
|
|
|
|
for i := range state.StateEvents {
|
|
|
|
stateEventIDs[i] = state.StateEvents[i].EventID()
|
|
|
|
}
|
|
|
|
|
|
|
|
ires[len(outliers)] = api.InputRoomEvent{
|
|
|
|
Kind: api.KindNew,
|
2017-07-13 05:41:30 -05:00
|
|
|
Event: event,
|
2017-07-07 08:11:32 -05:00
|
|
|
AuthEventIDs: event.AuthEventIDs(),
|
2017-05-25 10:08:28 -05:00
|
|
|
HasState: true,
|
|
|
|
StateEventIDs: stateEventIDs,
|
|
|
|
}
|
|
|
|
eventIDs[len(outliers)] = event.EventID()
|
|
|
|
|
|
|
|
return c.SendInputRoomEvents(ires, eventIDs)
|
|
|
|
}
|
|
|
|
|
2017-03-15 08:36:26 -05:00
|
|
|
// SendInputRoomEvents writes the given input room events to the roomserver input log. The length of both
|
|
|
|
// arrays must match, and each element must correspond to the same event.
|
|
|
|
func (c *RoomserverProducer) SendInputRoomEvents(ires []api.InputRoomEvent, eventIDs []string) error {
|
|
|
|
// TODO: Nicer way of doing this. Options are:
|
|
|
|
// A) Like this
|
|
|
|
// B) Add EventID field to InputRoomEvent
|
|
|
|
// C) Add wrapper struct with the EventID and the InputRoomEvent
|
|
|
|
if len(eventIDs) != len(ires) {
|
|
|
|
return fmt.Errorf("WriteInputRoomEvents: length mismatch %d != %d", len(eventIDs), len(ires))
|
|
|
|
}
|
|
|
|
|
|
|
|
msgs := make([]*sarama.ProducerMessage, len(ires))
|
|
|
|
for i := range ires {
|
|
|
|
msg, err := c.toProducerMessage(ires[i], eventIDs[i])
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
msgs[i] = msg
|
|
|
|
}
|
|
|
|
return c.Producer.SendMessages(msgs)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *RoomserverProducer) toProducerMessage(ire api.InputRoomEvent, eventID string) (*sarama.ProducerMessage, error) {
|
|
|
|
value, err := json.Marshal(ire)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
var m sarama.ProducerMessage
|
|
|
|
m.Topic = c.Topic
|
|
|
|
m.Key = sarama.StringEncoder(eventID)
|
|
|
|
m.Value = sarama.ByteEncoder(value)
|
|
|
|
return &m, nil
|
|
|
|
}
|