mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-14 10:23:46 -06:00
Clean roomserver consumer
This commit is contained in:
parent
34165c7761
commit
f61bfdc6af
|
|
@ -35,12 +35,9 @@ type OutputRoomEvent struct {
|
||||||
db *storage.SyncServerDatabase
|
db *storage.SyncServerDatabase
|
||||||
notifier *sync.Notifier
|
notifier *sync.Notifier
|
||||||
query api.RoomserverQueryAPI
|
query api.RoomserverQueryAPI
|
||||||
serverName gomatrixserverlib.ServerName
|
|
||||||
keyID gomatrixserverlib.KeyID
|
|
||||||
privateKey []byte
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type prevMembership struct {
|
type prevEventRef struct {
|
||||||
PrevContent json.RawMessage `json:"prev_content"`
|
PrevContent json.RawMessage `json:"prev_content"`
|
||||||
PrevID string `json:"replaces_state"`
|
PrevID string `json:"replaces_state"`
|
||||||
UserID string `json:"prev_sender"`
|
UserID string `json:"prev_sender"`
|
||||||
|
|
@ -64,9 +61,6 @@ func NewOutputRoomEvent(cfg *config.Dendrite, n *sync.Notifier, store *storage.S
|
||||||
db: store,
|
db: store,
|
||||||
notifier: n,
|
notifier: n,
|
||||||
query: api.NewRoomserverQueryAPIHTTP(roomServerURL, nil),
|
query: api.NewRoomserverQueryAPIHTTP(roomServerURL, nil),
|
||||||
serverName: cfg.Matrix.ServerName,
|
|
||||||
keyID: cfg.Matrix.KeyID,
|
|
||||||
privateKey: cfg.Matrix.PrivateKey,
|
|
||||||
}
|
}
|
||||||
consumer.ProcessMessage = s.onMessage
|
consumer.ProcessMessage = s.onMessage
|
||||||
|
|
||||||
|
|
@ -113,13 +107,13 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
}).Panicf("roomserver output log: state event lookup failure")
|
}).Panicf("roomserver output log: state event lookup failure")
|
||||||
}
|
}
|
||||||
|
|
||||||
ev, err = s.updateStateEvent(ev, s.keyID, s.privateKey)
|
ev, err = s.updateStateEvent(ev)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range addsStateEvents {
|
for i := range addsStateEvents {
|
||||||
addsStateEvents[i], err = s.updateStateEvent(addsStateEvents[i], s.keyID, s.privateKey)
|
addsStateEvents[i], err = s.updateStateEvent(addsStateEvents[i])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -201,10 +195,7 @@ func (s *OutputRoomEvent) lookupStateEvents(
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *OutputRoomEvent) updateStateEvent(
|
func (s *OutputRoomEvent) updateStateEvent(event gomatrixserverlib.Event) (gomatrixserverlib.Event, error) {
|
||||||
event gomatrixserverlib.Event, keyID gomatrixserverlib.KeyID,
|
|
||||||
privateKey []byte,
|
|
||||||
) (gomatrixserverlib.Event, error) {
|
|
||||||
var stateKey string
|
var stateKey string
|
||||||
if event.StateKey() == nil {
|
if event.StateKey() == nil {
|
||||||
stateKey = ""
|
stateKey = ""
|
||||||
|
|
@ -221,7 +212,7 @@ func (s *OutputRoomEvent) updateStateEvent(
|
||||||
return event, nil
|
return event, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
prev := prevMembership{
|
prev := prevEventRef{
|
||||||
PrevContent: prevEvent.Content(),
|
PrevContent: prevEvent.Content(),
|
||||||
PrevID: prevEvent.EventID(),
|
PrevID: prevEvent.EventID(),
|
||||||
UserID: prevEvent.Sender(),
|
UserID: prevEvent.Sender(),
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue