mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-07 15:03:09 -06:00
More comments
This commit is contained in:
parent
15e35062b9
commit
1d5b573a3f
|
|
@ -92,12 +92,19 @@ func (c *Consumer) consumePartition(pc sarama.PartitionConsumer) {
|
||||||
for message := range pc.Messages() {
|
for message := range pc.Messages() {
|
||||||
var input api.InputRoomEvent
|
var input api.InputRoomEvent
|
||||||
if err := json.Unmarshal(message.Value, &input); err != nil {
|
if err := json.Unmarshal(message.Value, &input); err != nil {
|
||||||
|
// If the message is invalid then log it and move onto the next message in the stream.
|
||||||
c.logError(message, err)
|
c.logError(message, err)
|
||||||
} else {
|
} else {
|
||||||
if err := processRoomEvent(c.DB, input); err != nil {
|
if err := processRoomEvent(c.DB, input); err != nil {
|
||||||
|
// If there was an error processing the message then log it and
|
||||||
|
// move onto the next message in the stream.
|
||||||
|
// TODO: If the error was due to a problem talking to the database
|
||||||
|
// then we shouldn't move onto the next message and we should either
|
||||||
|
// retry processing the message, or panic and kill ourselves.
|
||||||
c.logError(message, err)
|
c.logError(message, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Advance our position in the stream so that we will start at the right position after a restart.
|
||||||
if err := c.DB.SetPartitionOffset(c.RoomEventTopic, message.Partition, message.Offset); err != nil {
|
if err := c.DB.SetPartitionOffset(c.RoomEventTopic, message.Partition, message.Offset); err != nil {
|
||||||
c.logError(message, err)
|
c.logError(message, err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue