Handle duplicate kafka messages

The way we store the partition offsets for kafka streams means that when
we start after a crash we may get the last message we processed again.
This means that we have to be careful to ensure that the processing
handles consecutive duplicates correctly.
This commit is contained in:
Erik Johnston 2017-10-10 17:35:08 +01:00
parent 996b7d4198
commit 089506e6af
2 changed files with 32 additions and 13 deletions

View file

@ -134,25 +134,27 @@ func (s *OutputRoomEvent) processMessage(ore api.OutputNewRoomEvent) error {
return err
}
if oldJoinedHosts == nil {
// This means that there is nothing to update as this is a duplicate
// message.
return nil
}
if ore.SendAsServer == api.DoNotSendToOtherServers {
// Ignore event that we don't need to send anywhere.
return nil
}
// Work out which hosts were joined at the event itself.
joinedHostsAtEvent, err := s.joinedHostsAtEvent(ore, oldJoinedHosts)
joinedHostsAtEvent, err := s.joinedHostsAtEvent(ore, *oldJoinedHosts)
if err != nil {
return err
}
// Send the event.
if err = s.queues.SendEvent(
return s.queues.SendEvent(
&ore.Event, gomatrixserverlib.ServerName(ore.SendAsServer), joinedHostsAtEvent,
); err != nil {
return err
}
return nil
)
}
// joinedHostsAtEvent works out a list of matrix servers that were joined to

View file

@ -62,30 +62,47 @@ func (d *Database) prepare() error {
}
// UpdateRoom updates the joined hosts for a room and returns what the joined
// hosts were before the update.
// hosts were before the update, or nil if this was a duplicate message.
// This is called when we receive a message from kafka, so we pass in
// oldEventID and newEventID to check that we haven't missed any messages or
// this isn't a duplicate message.
func (d *Database) UpdateRoom(
ctx context.Context,
roomID, oldEventID, newEventID string,
addHosts []types.JoinedHost,
removeHosts []string,
) (joinedHosts []types.JoinedHost, err error) {
err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
if err = d.insertRoom(ctx, txn, roomID); err != nil {
) (*[]types.JoinedHost, error) {
var joinedHostsPtr *[]types.JoinedHost
err := common.WithTransaction(d.db, func(txn *sql.Tx) error {
err := d.insertRoom(ctx, txn, roomID)
if err != nil {
return err
}
lastSentEventID, err := d.selectRoomForUpdate(ctx, txn, roomID)
if err != nil {
return err
}
if lastSentEventID == newEventID {
// We've handled this messages before, so lets just ignore it
return nil
}
if lastSentEventID != oldEventID {
return types.EventIDMismatchError{
DatabaseID: lastSentEventID, RoomServerID: oldEventID,
}
}
joinedHosts, err = d.selectJoinedHosts(ctx, txn, roomID)
joinedHosts, err := d.selectJoinedHosts(ctx, txn, roomID)
if err != nil {
return err
}
joinedHostsPtr = &joinedHosts
for _, add := range addHosts {
err = d.insertJoinedHosts(ctx, txn, roomID, add.MemberEventID, add.ServerName)
if err != nil {
@ -97,5 +114,5 @@ func (d *Database) UpdateRoom(
}
return d.updateRoom(ctx, txn, roomID, newEventID)
})
return
return joinedHostsPtr, err
}