From a26555c76747721e6b28318fbc39b593200ad8d3 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 19 Oct 2020 13:26:58 +0100 Subject: [PATCH] Sync API consumes old room events --- syncapi/consumers/roomserver.go | 37 ++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index ca48c8300..373baea54 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -97,6 +97,8 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { } } return s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent) + case api.OutputTypeOldRoomEvent: + return s.onOldRoomEvent(context.TODO(), *output.OldRoomEvent) case api.OutputTypeNewInviteEvent: return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent) case api.OutputTypeRetireInviteEvent: @@ -168,7 +170,40 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( log.ErrorKey: err, "add": msg.AddsStateEventIDs, "del": msg.RemovesStateEventIDs, - }).Panicf("roomserver output log: write event failure") + }).Panicf("roomserver output log: write new event failure") + return nil + } + + if pduPos, err = s.notifyJoinedPeeks(ctx, &ev, pduPos); err != nil { + logrus.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos) + return err + } + + s.notifier.OnNewEvent(&ev, "", nil, types.NewStreamToken(pduPos, 0, nil)) + + return nil +} + +func (s *OutputRoomEventConsumer) onOldRoomEvent( + ctx context.Context, msg api.OutputOldRoomEvent, +) error { + ev := msg.Event + + pduPos, err := s.db.WriteEvent( + ctx, + &ev, + []gomatrixserverlib.HeaderedEvent{}, + []string{}, // adds no state + []string{}, // removes no state + nil, // no transaction + false, // not excluded from sync + ) + if err != nil { + // panic rather than continue with an inconsistent database + log.WithFields(log.Fields{ + "event": string(ev.JSON()), + log.ErrorKey: err, + }).Panicf("roomserver output log: write old event failure") return nil }