diff --git a/eduserver/input/input.go b/eduserver/input/input.go index c54fb9de8..9cbecae51 100644 --- a/eduserver/input/input.go +++ b/eduserver/input/input.go @@ -39,6 +39,8 @@ type EDUServerInputAPI struct { OutputSendToDeviceEventTopic string // The kafka topic to output new receipt events to OutputReceiptEventTopic string + // The kafka topic to output presence changes to + OutputPresenceTopic string // kafka producer Producer sarama.SyncProducer // Internal user query API @@ -47,6 +49,9 @@ type EDUServerInputAPI struct { ServerName gomatrixserverlib.ServerName } +// Ensure EDUServerInputAPI implements EDUServerInputAPI +var _ api.EDUServerInputAPI = (*EDUServerInputAPI)(nil) + // InputTypingEvent implements api.EDUServerInputAPI func (t *EDUServerInputAPI) InputTypingEvent( ctx context.Context, @@ -203,3 +208,34 @@ func (t *EDUServerInputAPI) InputReceiptEvent( _, _, err = t.Producer.SendMessage(m) return err } + +// InputPresence implements api.EDUServerInputAPI +// TODO: Intelligently batch requests sent by the same user (e.g wait a few milliseconds before emitting output events) +func (t *EDUServerInputAPI) InputPresence( + ctx context.Context, + request *api.InputPresenceRequest, + response *api.InputPresenceResponse, +) error { + logrus.WithFields(logrus.Fields{ + "user_id": request.UserID, + "status": request.Status, + "status_msg": request.StatusMsg, + }).Infof("Producing to topic '%s'", t.OutputPresenceTopic) + output := &api.OutputPresence{ + UserID: request.UserID, + Status: request.Status, + StatusMsg: request.StatusMsg, + Timestamp: request.Timestamp, + } + js, err := json.Marshal(output) + if err != nil { + return err + } + m := &sarama.ProducerMessage{ + Topic: t.OutputPresenceTopic, + Key: sarama.StringEncoder(request.UserID), + Value: sarama.ByteEncoder(js), + } + _, _, err = t.Producer.SendMessage(m) + return err +}