From 1d5b573a3f1364593df9a53f8b27ed6947950c5a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 7 Feb 2017 14:13:25 +0000 Subject: [PATCH] More comments --- .../matrix-org/dendrite/roomserver/input/consumer.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go b/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go index 922653724..14a3ce15f 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go @@ -92,12 +92,19 @@ func (c *Consumer) consumePartition(pc sarama.PartitionConsumer) { for message := range pc.Messages() { var input api.InputRoomEvent if err := json.Unmarshal(message.Value, &input); err != nil { + // If the message is invalid then log it and move onto the next message in the stream. c.logError(message, err) } else { if err := processRoomEvent(c.DB, input); err != nil { + // If there was an error processing the message then log it and + // move onto the next message in the stream. + // TODO: If the error was due to a problem talking to the database + // then we shouldn't move onto the next message and we should either + // retry processing the message, or panic and kill ourselves. c.logError(message, err) } } + // Advance our position in the stream so that we will start at the right position after a restart. if err := c.DB.SetPartitionOffset(c.RoomEventTopic, message.Partition, message.Offset); err != nil { c.logError(message, err) }