From 089506e6af102bd20dbafec8917b0a609f8946d5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 10 Oct 2017 17:35:08 +0100 Subject: [PATCH] Handle duplicate kafka messages The way we store the partition offsets for kafka streams means that when we start after a crash we may get the last message we processed again. This means that we have to be careful to ensure that the processing handles consecutive duplicates correctly. --- .../federationsender/consumers/roomserver.go | 16 +++++----- .../federationsender/storage/storage.go | 29 +++++++++++++++---- 2 files changed, 32 insertions(+), 13 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go index 04fc1265e..41894113f 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go @@ -134,25 +134,27 @@ func (s *OutputRoomEvent) processMessage(ore api.OutputNewRoomEvent) error { return err } + if oldJoinedHosts == nil { + // This means that there is nothing to update as this is a duplicate + // message. + return nil + } + if ore.SendAsServer == api.DoNotSendToOtherServers { // Ignore event that we don't need to send anywhere. return nil } // Work out which hosts were joined at the event itself. - joinedHostsAtEvent, err := s.joinedHostsAtEvent(ore, oldJoinedHosts) + joinedHostsAtEvent, err := s.joinedHostsAtEvent(ore, *oldJoinedHosts) if err != nil { return err } // Send the event. - if err = s.queues.SendEvent( + return s.queues.SendEvent( &ore.Event, gomatrixserverlib.ServerName(ore.SendAsServer), joinedHostsAtEvent, - ); err != nil { - return err - } - - return nil + ) } // joinedHostsAtEvent works out a list of matrix servers that were joined to diff --git a/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go b/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go index fc7f830e4..27a50a12b 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go @@ -62,30 +62,47 @@ func (d *Database) prepare() error { } // UpdateRoom updates the joined hosts for a room and returns what the joined -// hosts were before the update. +// hosts were before the update, or nil if this was a duplicate message. +// This is called when we receive a message from kafka, so we pass in +// oldEventID and newEventID to check that we haven't missed any messages or +// this isn't a duplicate message. func (d *Database) UpdateRoom( ctx context.Context, roomID, oldEventID, newEventID string, addHosts []types.JoinedHost, removeHosts []string, -) (joinedHosts []types.JoinedHost, err error) { - err = common.WithTransaction(d.db, func(txn *sql.Tx) error { - if err = d.insertRoom(ctx, txn, roomID); err != nil { +) (*[]types.JoinedHost, error) { + var joinedHostsPtr *[]types.JoinedHost + + err := common.WithTransaction(d.db, func(txn *sql.Tx) error { + err := d.insertRoom(ctx, txn, roomID) + if err != nil { return err } + lastSentEventID, err := d.selectRoomForUpdate(ctx, txn, roomID) if err != nil { return err } + + if lastSentEventID == newEventID { + // We've handled this messages before, so lets just ignore it + return nil + } + if lastSentEventID != oldEventID { return types.EventIDMismatchError{ DatabaseID: lastSentEventID, RoomServerID: oldEventID, } } - joinedHosts, err = d.selectJoinedHosts(ctx, txn, roomID) + + joinedHosts, err := d.selectJoinedHosts(ctx, txn, roomID) if err != nil { return err } + + joinedHostsPtr = &joinedHosts + for _, add := range addHosts { err = d.insertJoinedHosts(ctx, txn, roomID, add.MemberEventID, add.ServerName) if err != nil { @@ -97,5 +114,5 @@ func (d *Database) UpdateRoom( } return d.updateRoom(ctx, txn, roomID, newEventID) }) - return + return joinedHostsPtr, err }