Define output room event type

This commit is contained in:
Neil Alexander 2020-09-09 13:43:39 +01:00
parent 54d1228609
commit d67eb6ed2b
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
4 changed files with 22 additions and 9 deletions

View file

@ -110,9 +110,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// processMessage updates the list of currently joined hosts in the room // processMessage updates the list of currently joined hosts in the room
// and then sends the event to the hosts that were joined before the event. // and then sends the event to the hosts that were joined before the event.
func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error { func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error {
if ore.Historical {
return nil
}
addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(ore.AddsState())) addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(ore.AddsState()))
if err != nil { if err != nil {
@ -143,7 +140,7 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
return nil return nil
} }
if ore.SendAsServer == api.DoNotSendToOtherServers { if ore.SendAsServer == api.DoNotSendToOtherServers || ore.Type == api.OutputRoomState {
// Ignore event that we don't need to send anywhere. // Ignore event that we don't need to send anywhere.
return nil return nil
} }

View file

@ -63,6 +63,17 @@ type OutputEvent struct {
RedactedEvent *OutputRedactedEvent `json:"redacted_event,omitempty"` RedactedEvent *OutputRedactedEvent `json:"redacted_event,omitempty"`
} }
// Type of the OutputNewRoomEvent.
type OutputRoomEventType int
const (
// The event is a timeline event and likely just happened.
OutputRoomTimeline OutputRoomEventType = iota
// The event is a state event and quite possibly happened in the past.
OutputRoomState
)
// An OutputNewRoomEvent is written when the roomserver receives a new event. // An OutputNewRoomEvent is written when the roomserver receives a new event.
// It contains the full matrix room event and enough information for a // It contains the full matrix room event and enough information for a
// consumer to construct the current state of the room and the state before the // consumer to construct the current state of the room and the state before the
@ -75,9 +86,9 @@ type OutputEvent struct {
type OutputNewRoomEvent struct { type OutputNewRoomEvent struct {
// The Event. // The Event.
Event gomatrixserverlib.HeaderedEvent `json:"event"` Event gomatrixserverlib.HeaderedEvent `json:"event"`
// Is the event historical? If so, then downstream components should not treat the // Is the event a timeline event or a state event? Defaults to timeline
// event as if it just arrived. // if not specified.
Historical bool `json:"historical"` Type OutputRoomEventType `json:"type"`
// The latest events in the room after this event. // The latest events in the room after this event.
// This can be used to set the prev events for new events in the room. // This can be used to set the prev events for new events in the room.
// This also can be used to get the full current state after this event. // This also can be used to get the full current state after this event.

View file

@ -307,9 +307,14 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
latestEventIDs[i] = u.latest[i].EventID latestEventIDs[i] = u.latest[i].EventID
} }
var outputType api.OutputRoomEventType
if u.isHistorical {
outputType = api.OutputRoomState
}
ore := api.OutputNewRoomEvent{ ore := api.OutputNewRoomEvent{
Event: u.event.Headered(u.roomInfo.RoomVersion), Event: u.event.Headered(u.roomInfo.RoomVersion),
Historical: u.isHistorical, Type: outputType,
LastSentEventID: u.lastEventIDSent, LastSentEventID: u.lastEventIDSent,
LatestEventIDs: latestEventIDs, LatestEventIDs: latestEventIDs,
TransactionID: u.transactionID, TransactionID: u.transactionID,

View file

@ -143,7 +143,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
} }
} }
if msg.Historical { if msg.Type == api.OutputRoomState {
return nil return nil
} }