From ed8601170473633958a0c3b413b2ff15e841c9ad Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 17 Aug 2017 15:07:45 +0100 Subject: [PATCH] Retrieve state events from the roomserver query API + avoid dupes on join --- .../cmd/dendrite-monolith-server/main.go | 2 +- .../dendrite-public-rooms-api-server/main.go | 5 ++- .../publicroomsapi/consumers/roomserver.go | 12 +++++- .../publicroomsapi/storage/storage.go | 38 ++++++++++++++++--- 4 files changed, 48 insertions(+), 9 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go index 1dbbe1903..25b269b89 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go @@ -300,7 +300,7 @@ func (m *monolith) setupConsumers() { } publicRoomsAPIConsumer := publicroomsapi_consumers.NewOutputRoomEvent( - m.cfg, m.kafkaConsumer(), m.publicRoomsAPIDB, + m.cfg, m.kafkaConsumer(), m.publicRoomsAPIDB, m.queryAPI, ) if err = publicRoomsAPIConsumer.Start(); err != nil { log.Panicf("startup: failed to start room server consumer: %s", err) diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-public-rooms-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-public-rooms-api-server/main.go index 21f65410e..c8e705f91 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-public-rooms-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-public-rooms-api-server/main.go @@ -26,6 +26,7 @@ import ( "github.com/matrix-org/dendrite/publicroomsapi/consumers" "github.com/matrix-org/dendrite/publicroomsapi/routing" "github.com/matrix-org/dendrite/publicroomsapi/storage" + "github.com/matrix-org/dendrite/roomserver/api" log "github.com/Sirupsen/logrus" sarama "gopkg.in/Shopify/sarama.v1" @@ -46,6 +47,8 @@ func main() { log.Fatalf("Invalid config file: %s", err) } + queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil) + db, err := storage.NewPublicRoomsServerDatabase(string(cfg.Database.PublicRoomsAPI)) if err != nil { log.Panicf("startup: failed to create public rooms server database with data source %s : %s", cfg.Database.PublicRoomsAPI, err) @@ -64,7 +67,7 @@ func main() { }).Panic("Failed to setup kafka consumers") } - roomConsumer := consumers.NewOutputRoomEvent(cfg, kafkaConsumer, db) + roomConsumer := consumers.NewOutputRoomEvent(cfg, kafkaConsumer, db, queryAPI) if err != nil { log.Panicf("startup: failed to create room server consumer: %s", err) } diff --git a/src/github.com/matrix-org/dendrite/publicroomsapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/publicroomsapi/consumers/roomserver.go index 6a308fd21..ab4c80bed 100644 --- a/src/github.com/matrix-org/dendrite/publicroomsapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/publicroomsapi/consumers/roomserver.go @@ -37,8 +37,8 @@ func NewOutputRoomEvent( cfg *config.Dendrite, kafkaConsumer sarama.Consumer, store *storage.PublicRoomsServerDatabase, + queryAPI api.RoomserverQueryAPI, ) *OutputRoomEvent { - consumer := common.ContinualConsumer{ Topic: string(cfg.Kafka.Topics.OutputRoomEvent), Consumer: kafkaConsumer, @@ -47,6 +47,7 @@ func NewOutputRoomEvent( s := &OutputRoomEvent{ roomServerConsumer: &consumer, db: store, + query: queryAPI, } consumer.ProcessMessage = s.onMessage @@ -82,5 +83,12 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { "type": ev.Type(), }).Info("received event from roomserver") - return s.db.UpdateRoomFromEvent(ev) + queryReq := api.QueryEventsByIDRequest{output.NewRoomEvent.AddsStateEventIDs} + var queryRes api.QueryEventsByIDResponse + if err := s.query.QueryEventsByID(&queryReq, &queryRes); err != nil { + log.Warn(err) + return err + } + + return s.db.UpdateRoomFromEvents(queryRes.Events, output.NewRoomEvent.RemovesStateEventIDs) } diff --git a/src/github.com/matrix-org/dendrite/publicroomsapi/storage/storage.go b/src/github.com/matrix-org/dendrite/publicroomsapi/storage/storage.go index 664ed1649..d6bef0951 100644 --- a/src/github.com/matrix-org/dendrite/publicroomsapi/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/publicroomsapi/storage/storage.go @@ -90,13 +90,32 @@ func (d *PublicRoomsServerDatabase) GetPublicRooms(offset int64, limit int16, fi return d.statements.selectPublicRooms(offset, limit, filter) } +// UpdateRoomFromEvents iterate over a slice of state events and call +// UpdateRoomFromEvent on each of them to update the database representation of +// the rooms updated by each event. +// If the update triggered by one of the events failed, aborts the process and +// returns an error. +func (d *PublicRoomsServerDatabase) UpdateRoomFromEvents( + eventsToAdd []gomatrixserverlib.Event, iDsToRemove []string, +) error { + for _, event := range eventsToAdd { + if err := d.UpdateRoomFromEvent(event, iDsToRemove); err != nil { + return err + } + } + + return nil +} + // UpdateRoomFromEvent updates the database representation of a room from a Matrix event, by // checking the event's type to know which attribute to change and using the event's content // to define the new value of the attribute. // If the event doesn't match with any property used to compute the public room directory, // does nothing. // If something went wrong during the process, returns an error. -func (d *PublicRoomsServerDatabase) UpdateRoomFromEvent(event gomatrixserverlib.Event) error { +func (d *PublicRoomsServerDatabase) UpdateRoomFromEvent( + event gomatrixserverlib.Event, iDsToRemove []string, +) error { roomID := event.RoomID() // Process the event according to its type @@ -104,7 +123,7 @@ func (d *PublicRoomsServerDatabase) UpdateRoomFromEvent(event gomatrixserverlib. case "m.room.create": return d.statements.insertNewRoom(roomID) case "m.room.member": - return d.updateNumJoinedUsers(event, roomID) + return d.updateNumJoinedUsers(event, iDsToRemove, roomID) case "m.room.aliases": return d.updateRoomAliases(event, roomID) case "m.room.canonical_alias": @@ -145,17 +164,26 @@ func (d *PublicRoomsServerDatabase) UpdateRoomFromEvent(event gomatrixserverlib. return nil } -// updateNumJoinedUsers updates the number of joined user in the database representation of the room identified by -// a given room ID, using a given Matrix event. +// updateNumJoinedUsers updates the number of joined user in the database representation +// of the room identified by a given room ID, using a given Matrix event. // If the event's membership is "join", increments the value, if not, decrements it. // Returns an error if the update failed. -func (d *PublicRoomsServerDatabase) updateNumJoinedUsers(membershipEvent gomatrixserverlib.Event, roomID string) error { +func (d *PublicRoomsServerDatabase) updateNumJoinedUsers( + membershipEvent gomatrixserverlib.Event, iDsToRemove []string, roomID string, +) error { membership, err := membershipEvent.Membership() if err != nil { return err } if membership == "join" { + if len(iDsToRemove) > 0 { + // This is an update for an user who already joined the room, most + // likely triggered by a profile update. We don't increment the + // number of joined member in this case because we already counted + // this user. + return nil + } return d.statements.incrementJoinedMembersInRoom(roomID) } else if membership == "leave" || membership == "ban" { return d.statements.decrementJoinedMembersInRoom(roomID)