mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-06 22:43:10 -06:00
Write output events
This commit is contained in:
parent
c5aef24d02
commit
0eceb3bcd7
85
src/github.com/matrix-org/dendrite/roomserver/api/output.go
Normal file
85
src/github.com/matrix-org/dendrite/roomserver/api/output.go
Normal file
|
|
@ -0,0 +1,85 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
)
|
||||
|
||||
// An OutputRoomEvent is written when the roomserver receives a new event.
|
||||
type OutputRoomEvent struct {
|
||||
// The JSON bytes of the event.
|
||||
Event []byte
|
||||
// The state event IDs needed to determine who can see this event.
|
||||
// This can be used to tell which users to send the event to.
|
||||
VisibilityEventIDs []string
|
||||
// The latest events in the room after this event.
|
||||
// This can be used to set the prev events for new events in the room.
|
||||
// This also can be used to get the full current state after this event.
|
||||
LatestEventIDs []string
|
||||
// The state event IDs that were added to the state of the room by this event.
|
||||
// Together with RemovesStateEventIDs this allows the receiver to keep an up to date
|
||||
// view of the current state of the room.
|
||||
AddsStateEventIDs []string
|
||||
// The state event IDs that were removed from the state of the room by this event.
|
||||
RemovesStateEventIDs []string
|
||||
// The ID of the event that was output before this event.
|
||||
// Or the empty string if this is the first event output for this room.
|
||||
// This is used by consumers to check if they can safely update their
|
||||
// current state using the delta supplied in AddsStateEventIDs and
|
||||
// RemovesStateEventIDs.
|
||||
// If they the LastSentEventID doesn't match what they were expecting it to
|
||||
// be they can use the LatestEventIDs to request the full current state.
|
||||
LastSentEventID string
|
||||
}
|
||||
|
||||
// UnmarshalJSON implements json.Unmarshaller
|
||||
func (ore *OutputRoomEvent) UnmarshalJSON(data []byte) error {
|
||||
// Create a struct rather than unmarshalling directly into the OutputRoomEvent
|
||||
// so that we can use json.RawMessage.
|
||||
// We use json.RawMessage so that the event JSON is sent as JSON rather than
|
||||
// being base64 encoded which is the default for []byte.
|
||||
var content struct {
|
||||
Event *json.RawMessage
|
||||
VisibilityEventIDs []string
|
||||
LatestEventIDs []string
|
||||
AddsStateEventIDs []string
|
||||
RemovesStateEventIDs []string
|
||||
LastSentEventID string
|
||||
}
|
||||
if err := json.Unmarshal(data, &content); err != nil {
|
||||
return err
|
||||
}
|
||||
if content.Event != nil {
|
||||
ore.Event = []byte(*content.Event)
|
||||
}
|
||||
ore.VisibilityEventIDs = content.VisibilityEventIDs
|
||||
ore.LatestEventIDs = content.LatestEventIDs
|
||||
ore.AddsStateEventIDs = content.AddsStateEventIDs
|
||||
ore.RemovesStateEventIDs = content.RemovesStateEventIDs
|
||||
ore.LastSentEventID = content.LastSentEventID
|
||||
return nil
|
||||
}
|
||||
|
||||
// MarshalJSON implements json.Marshaller
|
||||
func (ore OutputRoomEvent) MarshalJSON() ([]byte, error) {
|
||||
// Create a struct rather than marshalling directly from the OutputRoomEvent
|
||||
// so that we can use json.RawMessage.
|
||||
// We use json.RawMessage so that the event JSON is sent as JSON rather than
|
||||
// being base64 encoded which is the default for []byte.
|
||||
event := json.RawMessage(ore.Event)
|
||||
content := struct {
|
||||
Event *json.RawMessage
|
||||
VisibilityEventIDs []string
|
||||
LatestEventIDs []string
|
||||
AddsStateEventIDs []string
|
||||
RemovesStateEventIDs []string
|
||||
LastSentEventID string
|
||||
}{
|
||||
Event: &event,
|
||||
VisibilityEventIDs: ore.VisibilityEventIDs,
|
||||
LatestEventIDs: ore.LatestEventIDs,
|
||||
AddsStateEventIDs: ore.AddsStateEventIDs,
|
||||
RemovesStateEventIDs: ore.RemovesStateEventIDs,
|
||||
LastSentEventID: ore.LastSentEventID,
|
||||
}
|
||||
return json.Marshal(&content)
|
||||
}
|
||||
|
|
@ -34,15 +34,32 @@ type Consumer struct {
|
|||
// But any equivalent event streaming protocol could be made to implement the same interface.
|
||||
Consumer sarama.Consumer
|
||||
// The database used to store the room events.
|
||||
DB ConsumerDatabase
|
||||
DB ConsumerDatabase
|
||||
Producer sarama.SyncProducer
|
||||
// The kafkaesque topic to consume room events from.
|
||||
// This is the name used in kafka to identify the stream to consume events from.
|
||||
RoomEventTopic string
|
||||
InputRoomEventTopic string
|
||||
// The kafkaesque topic to output new room events to.
|
||||
// This is the name used in kafka to identify the stream to write events to.
|
||||
OutputRoomEventTopic string
|
||||
// The ErrorLogger for this consumer.
|
||||
// If left as nil then the consumer will panic when it encounters an error
|
||||
ErrorLogger ErrorLogger
|
||||
}
|
||||
|
||||
func (c *Consumer) WriteOutputRoomEvent(output api.OutputRoomEvent) error {
|
||||
var m sarama.ProducerMessage
|
||||
value, err := json.Marshal(output)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m.Topic = c.OutputRoomEventTopic
|
||||
m.Key = sarama.StringEncoder("")
|
||||
m.Value = sarama.ByteEncoder(value)
|
||||
_, _, err = c.Producer.SendMessage(&m)
|
||||
return err
|
||||
}
|
||||
|
||||
// Start starts the consumer consuming.
|
||||
// Starts up a goroutine for each partition in the kafka stream.
|
||||
// Returns nil once all the goroutines are started.
|
||||
|
|
@ -50,7 +67,7 @@ type Consumer struct {
|
|||
func (c *Consumer) Start() error {
|
||||
offsets := map[int32]int64{}
|
||||
|
||||
partitions, err := c.Consumer.Partitions(c.RoomEventTopic)
|
||||
partitions, err := c.Consumer.Partitions(c.InputRoomEventTopic)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -59,7 +76,7 @@ func (c *Consumer) Start() error {
|
|||
offsets[partition] = sarama.OffsetOldest
|
||||
}
|
||||
|
||||
storedOffsets, err := c.DB.PartitionOffsets(c.RoomEventTopic)
|
||||
storedOffsets, err := c.DB.PartitionOffsets(c.InputRoomEventTopic)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -70,7 +87,7 @@ func (c *Consumer) Start() error {
|
|||
|
||||
var partitionConsumers []sarama.PartitionConsumer
|
||||
for partition, offset := range offsets {
|
||||
pc, err := c.Consumer.ConsumePartition(c.RoomEventTopic, partition, offset)
|
||||
pc, err := c.Consumer.ConsumePartition(c.InputRoomEventTopic, partition, offset)
|
||||
if err != nil {
|
||||
for _, p := range partitionConsumers {
|
||||
p.Close()
|
||||
|
|
@ -95,7 +112,7 @@ func (c *Consumer) consumePartition(pc sarama.PartitionConsumer) {
|
|||
// If the message is invalid then log it and move onto the next message in the stream.
|
||||
c.logError(message, err)
|
||||
} else {
|
||||
if err := processRoomEvent(c.DB, input); err != nil {
|
||||
if err := processRoomEvent(c.DB, c, input); err != nil {
|
||||
// If there was an error processing the message then log it and
|
||||
// move onto the next message in the stream.
|
||||
// TODO: If the error was due to a problem talking to the database
|
||||
|
|
@ -105,7 +122,7 @@ func (c *Consumer) consumePartition(pc sarama.PartitionConsumer) {
|
|||
}
|
||||
}
|
||||
// Advance our position in the stream so that we will start at the right position after a restart.
|
||||
if err := c.DB.SetPartitionOffset(c.RoomEventTopic, message.Partition, message.Offset); err != nil {
|
||||
if err := c.DB.SetPartitionOffset(c.InputRoomEventTopic, message.Partition, message.Offset); err != nil {
|
||||
c.logError(message, err)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,7 +40,11 @@ type RoomEventDatabase interface {
|
|||
GetLatestEventsForUpdate(roomNID types.RoomNID) ([]types.StateAtEventAndReference, string, types.RoomRecentEventsUpdater, error)
|
||||
}
|
||||
|
||||
func processRoomEvent(db RoomEventDatabase, input api.InputRoomEvent) error {
|
||||
type OutputRoomEventWriter interface {
|
||||
WriteOutputRoomEvent(output api.OutputRoomEvent) error
|
||||
}
|
||||
|
||||
func processRoomEvent(db RoomEventDatabase, ow OutputRoomEventWriter, input api.InputRoomEvent) error {
|
||||
// Parse and validate the event JSON
|
||||
event, err := gomatrixserverlib.NewEventFromUntrustedJSON(input.Event)
|
||||
if err != nil {
|
||||
|
|
@ -95,7 +99,7 @@ func processRoomEvent(db RoomEventDatabase, input api.InputRoomEvent) error {
|
|||
}
|
||||
|
||||
// Update the extremities of the event graph for the room
|
||||
if err := updateLatestEvents(db, roomNID, stateAtEvent, event); err != nil {
|
||||
if err := updateLatestEvents(db, ow, roomNID, stateAtEvent, event); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package input
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
|
@ -22,7 +23,7 @@ import (
|
|||
// 7 <----- latest
|
||||
//
|
||||
func updateLatestEvents(
|
||||
db RoomEventDatabase, roomNID types.RoomNID, stateAtEvent types.StateAtEvent, event gomatrixserverlib.Event,
|
||||
db RoomEventDatabase, ow OutputRoomEventWriter, roomNID types.RoomNID, stateAtEvent types.StateAtEvent, event gomatrixserverlib.Event,
|
||||
) (err error) {
|
||||
oldLatest, lastEventIDSent, updater, err := db.GetLatestEventsForUpdate(roomNID)
|
||||
if err != nil {
|
||||
|
|
@ -42,12 +43,12 @@ func updateLatestEvents(
|
|||
}
|
||||
}()
|
||||
|
||||
err = doUpdateLatestEvents(updater, oldLatest, lastEventIDSent, roomNID, stateAtEvent, event)
|
||||
err = doUpdateLatestEvents(updater, ow, oldLatest, lastEventIDSent, roomNID, stateAtEvent, event)
|
||||
return
|
||||
}
|
||||
|
||||
func doUpdateLatestEvents(
|
||||
updater types.RoomRecentEventsUpdater, oldLatest []types.StateAtEventAndReference, lastEventIDSent string, roomNID types.RoomNID, stateAtEvent types.StateAtEvent, event gomatrixserverlib.Event,
|
||||
updater types.RoomRecentEventsUpdater, ow OutputRoomEventWriter, oldLatest []types.StateAtEventAndReference, lastEventIDSent string, roomNID types.RoomNID, stateAtEvent types.StateAtEvent, event gomatrixserverlib.Event,
|
||||
) error {
|
||||
var err error
|
||||
var prevEvents []gomatrixserverlib.EventReference
|
||||
|
|
@ -76,7 +77,11 @@ func doUpdateLatestEvents(
|
|||
StateAtEvent: stateAtEvent,
|
||||
})
|
||||
|
||||
// TODO: Send the event to the output logs.
|
||||
// Send the event to the output logs.
|
||||
// We do this inside the database transaction to ensure that we only mark an event as sent if we sent it.
|
||||
if err = writeEvent(ow, lastEventIDSent, event, newLatest); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = updater.SetLatestEvents(roomNID, newLatest, stateAtEvent.EventNID); err != nil {
|
||||
return err
|
||||
|
|
@ -121,3 +126,17 @@ func calculateLatest(oldLatest []types.StateAtEventAndReference, alreadyReferenc
|
|||
|
||||
return newLatest
|
||||
}
|
||||
|
||||
func writeEvent(ow OutputRoomEventWriter, lastEventIDSent string, event gomatrixserverlib.Event, latest []types.StateAtEventAndReference) error {
|
||||
|
||||
latestEventIDs := make([]string, len(latest))
|
||||
for i := range latest {
|
||||
latestEventIDs[i] = latest[i].EventID
|
||||
}
|
||||
|
||||
return ow.WriteOutputRoomEvent(api.OutputRoomEvent{
|
||||
Event: event.JSON(),
|
||||
LastSentEventID: lastEventIDSent,
|
||||
LatestEventIDs: latestEventIDs,
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,9 +10,10 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
database = os.Getenv("DATABASE")
|
||||
kafkaURIs = strings.Split(os.Getenv("KAFKA_URIS"), ",")
|
||||
roomEventTopic = os.Getenv("TOPIC_ROOM_EVENT")
|
||||
database = os.Getenv("DATABASE")
|
||||
kafkaURIs = strings.Split(os.Getenv("KAFKA_URIS"), ",")
|
||||
inputRoomEventTopic = os.Getenv("TOPIC_INPUT_ROOM_EVENT")
|
||||
outputRoomEventTopic = os.Getenv("TOPIC_OUTPUT_ROOM_EVENT")
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
|
@ -26,10 +27,17 @@ func main() {
|
|||
panic(err)
|
||||
}
|
||||
|
||||
kafkaProducer, err := sarama.NewSyncProducer(kafkaURIs, nil)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
consumer := input.Consumer{
|
||||
Consumer: kafkaConsumer,
|
||||
DB: db,
|
||||
RoomEventTopic: roomEventTopic,
|
||||
Consumer: kafkaConsumer,
|
||||
DB: db,
|
||||
Producer: kafkaProducer,
|
||||
InputRoomEventTopic: inputRoomEventTopic,
|
||||
OutputRoomEventTopic: outputRoomEventTopic,
|
||||
}
|
||||
|
||||
if err = consumer.Start(); err != nil {
|
||||
|
|
|
|||
Loading…
Reference in a new issue