From 917c433fd2d16c8c425b2610998a5c1838f88cfc Mon Sep 17 00:00:00 2001 From: Kegsay Date: Wed, 17 May 2017 17:29:26 +0100 Subject: [PATCH] Minor refactoring (#106) - `s/Server/OutputRoomEvent/` in `consumers` to accurately reflect what is being consumed. - `s/set/userIDSet/` in `notifier.go` for clarity. - Removed lying comments. --- .../cmd/dendrite-sync-api-server/main.go | 8 +++---- .../dendrite/syncapi/consumers/roomserver.go | 14 +++++------ .../dendrite/syncapi/sync/notifier.go | 23 +++++++++---------- 3 files changed, 22 insertions(+), 23 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go index 8b1da837b..5d1bf8e2e 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go @@ -81,12 +81,12 @@ func main() { if err := n.Load(db); err != nil { log.Panicf("startup: failed to set up notifier: %s", err) } - server, err := consumers.NewServer(cfg, n, db) + consumer, err := consumers.NewOutputRoomEvent(cfg, n, db) if err != nil { - log.Panicf("startup: failed to create sync server: %s", err) + log.Panicf("startup: failed to create room server consumer: %s", err) } - if err = server.Start(); err != nil { - log.Panicf("startup: failed to start sync server") + if err = consumer.Start(); err != nil { + log.Panicf("startup: failed to start room server consumer") } log.Info("Starting sync server on ", *bindAddr) diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go index 4d703ab32..b8ec98d2c 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go @@ -28,15 +28,15 @@ import ( sarama "gopkg.in/Shopify/sarama.v1" ) -// Server contains all the logic for running a sync server -type Server struct { +// OutputRoomEvent consumes events that originated in the room server. +type OutputRoomEvent struct { roomServerConsumer *common.ContinualConsumer db *storage.SyncServerDatabase notifier *sync.Notifier } -// NewServer creates a new sync server. Call Start() to begin consuming from room servers. -func NewServer(cfg *config.Sync, n *sync.Notifier, store *storage.SyncServerDatabase) (*Server, error) { +// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers. +func NewOutputRoomEvent(cfg *config.Sync, n *sync.Notifier, store *storage.SyncServerDatabase) (*OutputRoomEvent, error) { kafkaConsumer, err := sarama.NewConsumer(cfg.KafkaConsumerURIs, nil) if err != nil { return nil, err @@ -47,7 +47,7 @@ func NewServer(cfg *config.Sync, n *sync.Notifier, store *storage.SyncServerData Consumer: kafkaConsumer, PartitionStore: store, } - s := &Server{ + s := &OutputRoomEvent{ roomServerConsumer: &consumer, db: store, notifier: n, @@ -58,14 +58,14 @@ func NewServer(cfg *config.Sync, n *sync.Notifier, store *storage.SyncServerData } // Start consuming from room servers -func (s *Server) Start() error { +func (s *OutputRoomEvent) Start() error { return s.roomServerConsumer.Start() } // onMessage is called when the sync server receives a new event from the room server output log. // It is not safe for this function to be called from multiple goroutines, or else the // sync stream position may race and be incorrectly calculated. -func (s *Server) onMessage(msg *sarama.ConsumerMessage) error { +func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { // Parse out the event JSON var output api.OutputRoomEvent if err := json.Unmarshal(msg.Value, &output); err != nil { diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go index 1ed0cf55d..1cc9c4e28 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go @@ -32,11 +32,10 @@ import ( // in missed events. type Notifier struct { // A map of RoomID => Set : Must only be accessed by the OnNewEvent goroutine - roomIDToJoinedUsers map[string]set + roomIDToJoinedUsers map[string]userIDSet // Protects currPos and userStreams. streamLock *sync.Mutex - // The latest sync stream position: guarded by 'currPosMutex' which is RW to allow - // for concurrent reads on /sync requests + // The latest sync stream position currPos types.StreamPosition // A map of user_id => UserStream which can be used to wake a given user's /sync request. userStreams map[string]*UserStream @@ -44,11 +43,11 @@ type Notifier struct { // NewNotifier creates a new notifier set to the given stream position. // In order for this to be of any use, the Notifier needs to be told all rooms and -// the joined users within each of them by calling Notifier.LoadFromDatabase(). +// the joined users within each of them by calling Notifier.Load(*storage.SyncServerDatabase). func NewNotifier(pos types.StreamPosition) *Notifier { return &Notifier{ currPos: pos, - roomIDToJoinedUsers: make(map[string]set), + roomIDToJoinedUsers: make(map[string]userIDSet), userStreams: make(map[string]*UserStream), streamLock: &sync.Mutex{}, } @@ -142,7 +141,7 @@ func (n *Notifier) setUsersJoinedToRooms(roomIDToUserIDs map[string][]string) { // This is just the bulk form of addJoinedUser for roomID, userIDs := range roomIDToUserIDs { if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { - n.roomIDToJoinedUsers[roomID] = make(set) + n.roomIDToJoinedUsers[roomID] = make(userIDSet) } for _, userID := range userIDs { n.roomIDToJoinedUsers[roomID].add(userID) @@ -174,7 +173,7 @@ func (n *Notifier) fetchUserStream(userID string, makeIfNotExists bool) *UserStr // Not thread-safe: must be called on the OnNewEvent goroutine only func (n *Notifier) addJoinedUser(roomID, userID string) { if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { - n.roomIDToJoinedUsers[roomID] = make(set) + n.roomIDToJoinedUsers[roomID] = make(userIDSet) } n.roomIDToJoinedUsers[roomID].add(userID) } @@ -182,7 +181,7 @@ func (n *Notifier) addJoinedUser(roomID, userID string) { // Not thread-safe: must be called on the OnNewEvent goroutine only func (n *Notifier) removeJoinedUser(roomID, userID string) { if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { - n.roomIDToJoinedUsers[roomID] = make(set) + n.roomIDToJoinedUsers[roomID] = make(userIDSet) } n.roomIDToJoinedUsers[roomID].remove(userID) } @@ -196,17 +195,17 @@ func (n *Notifier) joinedUsers(roomID string) (userIDs []string) { } // A string set, mainly existing for improving clarity of structs in this file. -type set map[string]bool +type userIDSet map[string]bool -func (s set) add(str string) { +func (s userIDSet) add(str string) { s[str] = true } -func (s set) remove(str string) { +func (s userIDSet) remove(str string) { delete(s, str) } -func (s set) values() (vals []string) { +func (s userIDSet) values() (vals []string) { for str := range s { vals = append(vals, str) }