mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-12 01:13:10 -06:00
Retrieve state events from the roomserver query API + avoid dupes on join
This commit is contained in:
parent
1cdea0fd02
commit
ed86011704
|
|
@ -300,7 +300,7 @@ func (m *monolith) setupConsumers() {
|
||||||
}
|
}
|
||||||
|
|
||||||
publicRoomsAPIConsumer := publicroomsapi_consumers.NewOutputRoomEvent(
|
publicRoomsAPIConsumer := publicroomsapi_consumers.NewOutputRoomEvent(
|
||||||
m.cfg, m.kafkaConsumer(), m.publicRoomsAPIDB,
|
m.cfg, m.kafkaConsumer(), m.publicRoomsAPIDB, m.queryAPI,
|
||||||
)
|
)
|
||||||
if err = publicRoomsAPIConsumer.Start(); err != nil {
|
if err = publicRoomsAPIConsumer.Start(); err != nil {
|
||||||
log.Panicf("startup: failed to start room server consumer: %s", err)
|
log.Panicf("startup: failed to start room server consumer: %s", err)
|
||||||
|
|
|
||||||
|
|
@ -26,6 +26,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/publicroomsapi/consumers"
|
"github.com/matrix-org/dendrite/publicroomsapi/consumers"
|
||||||
"github.com/matrix-org/dendrite/publicroomsapi/routing"
|
"github.com/matrix-org/dendrite/publicroomsapi/routing"
|
||||||
"github.com/matrix-org/dendrite/publicroomsapi/storage"
|
"github.com/matrix-org/dendrite/publicroomsapi/storage"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/Sirupsen/logrus"
|
||||||
sarama "gopkg.in/Shopify/sarama.v1"
|
sarama "gopkg.in/Shopify/sarama.v1"
|
||||||
|
|
@ -46,6 +47,8 @@ func main() {
|
||||||
log.Fatalf("Invalid config file: %s", err)
|
log.Fatalf("Invalid config file: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil)
|
||||||
|
|
||||||
db, err := storage.NewPublicRoomsServerDatabase(string(cfg.Database.PublicRoomsAPI))
|
db, err := storage.NewPublicRoomsServerDatabase(string(cfg.Database.PublicRoomsAPI))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicf("startup: failed to create public rooms server database with data source %s : %s", cfg.Database.PublicRoomsAPI, err)
|
log.Panicf("startup: failed to create public rooms server database with data source %s : %s", cfg.Database.PublicRoomsAPI, err)
|
||||||
|
|
@ -64,7 +67,7 @@ func main() {
|
||||||
}).Panic("Failed to setup kafka consumers")
|
}).Panic("Failed to setup kafka consumers")
|
||||||
}
|
}
|
||||||
|
|
||||||
roomConsumer := consumers.NewOutputRoomEvent(cfg, kafkaConsumer, db)
|
roomConsumer := consumers.NewOutputRoomEvent(cfg, kafkaConsumer, db, queryAPI)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicf("startup: failed to create room server consumer: %s", err)
|
log.Panicf("startup: failed to create room server consumer: %s", err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -37,8 +37,8 @@ func NewOutputRoomEvent(
|
||||||
cfg *config.Dendrite,
|
cfg *config.Dendrite,
|
||||||
kafkaConsumer sarama.Consumer,
|
kafkaConsumer sarama.Consumer,
|
||||||
store *storage.PublicRoomsServerDatabase,
|
store *storage.PublicRoomsServerDatabase,
|
||||||
|
queryAPI api.RoomserverQueryAPI,
|
||||||
) *OutputRoomEvent {
|
) *OutputRoomEvent {
|
||||||
|
|
||||||
consumer := common.ContinualConsumer{
|
consumer := common.ContinualConsumer{
|
||||||
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
|
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
|
||||||
Consumer: kafkaConsumer,
|
Consumer: kafkaConsumer,
|
||||||
|
|
@ -47,6 +47,7 @@ func NewOutputRoomEvent(
|
||||||
s := &OutputRoomEvent{
|
s := &OutputRoomEvent{
|
||||||
roomServerConsumer: &consumer,
|
roomServerConsumer: &consumer,
|
||||||
db: store,
|
db: store,
|
||||||
|
query: queryAPI,
|
||||||
}
|
}
|
||||||
consumer.ProcessMessage = s.onMessage
|
consumer.ProcessMessage = s.onMessage
|
||||||
|
|
||||||
|
|
@ -82,5 +83,12 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
"type": ev.Type(),
|
"type": ev.Type(),
|
||||||
}).Info("received event from roomserver")
|
}).Info("received event from roomserver")
|
||||||
|
|
||||||
return s.db.UpdateRoomFromEvent(ev)
|
queryReq := api.QueryEventsByIDRequest{output.NewRoomEvent.AddsStateEventIDs}
|
||||||
|
var queryRes api.QueryEventsByIDResponse
|
||||||
|
if err := s.query.QueryEventsByID(&queryReq, &queryRes); err != nil {
|
||||||
|
log.Warn(err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.db.UpdateRoomFromEvents(queryRes.Events, output.NewRoomEvent.RemovesStateEventIDs)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -90,13 +90,32 @@ func (d *PublicRoomsServerDatabase) GetPublicRooms(offset int64, limit int16, fi
|
||||||
return d.statements.selectPublicRooms(offset, limit, filter)
|
return d.statements.selectPublicRooms(offset, limit, filter)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UpdateRoomFromEvents iterate over a slice of state events and call
|
||||||
|
// UpdateRoomFromEvent on each of them to update the database representation of
|
||||||
|
// the rooms updated by each event.
|
||||||
|
// If the update triggered by one of the events failed, aborts the process and
|
||||||
|
// returns an error.
|
||||||
|
func (d *PublicRoomsServerDatabase) UpdateRoomFromEvents(
|
||||||
|
eventsToAdd []gomatrixserverlib.Event, iDsToRemove []string,
|
||||||
|
) error {
|
||||||
|
for _, event := range eventsToAdd {
|
||||||
|
if err := d.UpdateRoomFromEvent(event, iDsToRemove); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// UpdateRoomFromEvent updates the database representation of a room from a Matrix event, by
|
// UpdateRoomFromEvent updates the database representation of a room from a Matrix event, by
|
||||||
// checking the event's type to know which attribute to change and using the event's content
|
// checking the event's type to know which attribute to change and using the event's content
|
||||||
// to define the new value of the attribute.
|
// to define the new value of the attribute.
|
||||||
// If the event doesn't match with any property used to compute the public room directory,
|
// If the event doesn't match with any property used to compute the public room directory,
|
||||||
// does nothing.
|
// does nothing.
|
||||||
// If something went wrong during the process, returns an error.
|
// If something went wrong during the process, returns an error.
|
||||||
func (d *PublicRoomsServerDatabase) UpdateRoomFromEvent(event gomatrixserverlib.Event) error {
|
func (d *PublicRoomsServerDatabase) UpdateRoomFromEvent(
|
||||||
|
event gomatrixserverlib.Event, iDsToRemove []string,
|
||||||
|
) error {
|
||||||
roomID := event.RoomID()
|
roomID := event.RoomID()
|
||||||
|
|
||||||
// Process the event according to its type
|
// Process the event according to its type
|
||||||
|
|
@ -104,7 +123,7 @@ func (d *PublicRoomsServerDatabase) UpdateRoomFromEvent(event gomatrixserverlib.
|
||||||
case "m.room.create":
|
case "m.room.create":
|
||||||
return d.statements.insertNewRoom(roomID)
|
return d.statements.insertNewRoom(roomID)
|
||||||
case "m.room.member":
|
case "m.room.member":
|
||||||
return d.updateNumJoinedUsers(event, roomID)
|
return d.updateNumJoinedUsers(event, iDsToRemove, roomID)
|
||||||
case "m.room.aliases":
|
case "m.room.aliases":
|
||||||
return d.updateRoomAliases(event, roomID)
|
return d.updateRoomAliases(event, roomID)
|
||||||
case "m.room.canonical_alias":
|
case "m.room.canonical_alias":
|
||||||
|
|
@ -145,17 +164,26 @@ func (d *PublicRoomsServerDatabase) UpdateRoomFromEvent(event gomatrixserverlib.
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateNumJoinedUsers updates the number of joined user in the database representation of the room identified by
|
// updateNumJoinedUsers updates the number of joined user in the database representation
|
||||||
// a given room ID, using a given Matrix event.
|
// of the room identified by a given room ID, using a given Matrix event.
|
||||||
// If the event's membership is "join", increments the value, if not, decrements it.
|
// If the event's membership is "join", increments the value, if not, decrements it.
|
||||||
// Returns an error if the update failed.
|
// Returns an error if the update failed.
|
||||||
func (d *PublicRoomsServerDatabase) updateNumJoinedUsers(membershipEvent gomatrixserverlib.Event, roomID string) error {
|
func (d *PublicRoomsServerDatabase) updateNumJoinedUsers(
|
||||||
|
membershipEvent gomatrixserverlib.Event, iDsToRemove []string, roomID string,
|
||||||
|
) error {
|
||||||
membership, err := membershipEvent.Membership()
|
membership, err := membershipEvent.Membership()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if membership == "join" {
|
if membership == "join" {
|
||||||
|
if len(iDsToRemove) > 0 {
|
||||||
|
// This is an update for an user who already joined the room, most
|
||||||
|
// likely triggered by a profile update. We don't increment the
|
||||||
|
// number of joined member in this case because we already counted
|
||||||
|
// this user.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
return d.statements.incrementJoinedMembersInRoom(roomID)
|
return d.statements.incrementJoinedMembersInRoom(roomID)
|
||||||
} else if membership == "leave" || membership == "ban" {
|
} else if membership == "leave" || membership == "ban" {
|
||||||
return d.statements.decrementJoinedMembersInRoom(roomID)
|
return d.statements.decrementJoinedMembersInRoom(roomID)
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue