From 35b628f5bfbf886178e764fbc4788ecf937601b4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 16 Oct 2017 13:20:24 +0100 Subject: [PATCH] Handle duplicate kafka messages (#301) The way we store the partition offsets for kafka streams means that when we start after a crash we may get the last message we processed again. This means that we have to be careful to ensure that the processing handles consecutive duplicates correctly. --- .../federationsender/consumers/roomserver.go | 16 ++++++++++------ .../federationsender/storage/storage.go | 19 +++++++++++++++++-- 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go index d172323ca..a396aaf6d 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go @@ -134,6 +134,14 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err return err } + if oldJoinedHosts == nil { + // This means that there is nothing to update as this is a duplicate + // message. + // This can happen if dendrite crashed between reading the message and + // persisting the stream position. + return nil + } + if ore.SendAsServer == api.DoNotSendToOtherServers { // Ignore event that we don't need to send anywhere. return nil @@ -146,13 +154,9 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err } // Send the event. - if err = s.queues.SendEvent( + return s.queues.SendEvent( &ore.Event, gomatrixserverlib.ServerName(ore.SendAsServer), joinedHostsAtEvent, - ); err != nil { - return err - } - - return nil + ) } // joinedHostsAtEvent works out a list of matrix servers that were joined to diff --git a/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go b/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go index fc7f830e4..ab97dc449 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go @@ -62,7 +62,10 @@ func (d *Database) prepare() error { } // UpdateRoom updates the joined hosts for a room and returns what the joined -// hosts were before the update. +// hosts were before the update, or nil if this was a duplicate message. +// This is called when we receive a message from kafka, so we pass in +// oldEventID and newEventID to check that we haven't missed any messages or +// this isn't a duplicate message. func (d *Database) UpdateRoom( ctx context.Context, roomID, oldEventID, newEventID string, @@ -70,22 +73,34 @@ func (d *Database) UpdateRoom( removeHosts []string, ) (joinedHosts []types.JoinedHost, err error) { err = common.WithTransaction(d.db, func(txn *sql.Tx) error { - if err = d.insertRoom(ctx, txn, roomID); err != nil { + err = d.insertRoom(ctx, txn, roomID) + if err != nil { return err } + lastSentEventID, err := d.selectRoomForUpdate(ctx, txn, roomID) if err != nil { return err } + + if lastSentEventID == newEventID { + // We've handled this message before, so let's just ignore it. + // We can only get a duplicate for the last message we processed, + // so its enough just to compare the newEventID with lastSentEventID + return nil + } + if lastSentEventID != oldEventID { return types.EventIDMismatchError{ DatabaseID: lastSentEventID, RoomServerID: oldEventID, } } + joinedHosts, err = d.selectJoinedHosts(ctx, txn, roomID) if err != nil { return err } + for _, add := range addHosts { err = d.insertJoinedHosts(ctx, txn, roomID, add.MemberEventID, add.ServerName) if err != nil {