Add "forward" to sarama

This commit is contained in:
Till Faelligen 2021-06-18 20:52:11 +02:00
parent 2ae8084fd3
commit 9e795ce5e7

View file

@ -39,6 +39,8 @@ type EDUServerInputAPI struct {
OutputSendToDeviceEventTopic string
// The kafka topic to output new receipt events to
OutputReceiptEventTopic string
// The kafka topic to output presence changes to
OutputPresenceTopic string
// kafka producer
Producer sarama.SyncProducer
// Internal user query API
@ -47,6 +49,9 @@ type EDUServerInputAPI struct {
ServerName gomatrixserverlib.ServerName
}
// Ensure EDUServerInputAPI implements EDUServerInputAPI
var _ api.EDUServerInputAPI = (*EDUServerInputAPI)(nil)
// InputTypingEvent implements api.EDUServerInputAPI
func (t *EDUServerInputAPI) InputTypingEvent(
ctx context.Context,
@ -203,3 +208,34 @@ func (t *EDUServerInputAPI) InputReceiptEvent(
_, _, err = t.Producer.SendMessage(m)
return err
}
// InputPresence implements api.EDUServerInputAPI
// TODO: Intelligently batch requests sent by the same user (e.g wait a few milliseconds before emitting output events)
func (t *EDUServerInputAPI) InputPresence(
ctx context.Context,
request *api.InputPresenceRequest,
response *api.InputPresenceResponse,
) error {
logrus.WithFields(logrus.Fields{
"user_id": request.UserID,
"status": request.Status,
"status_msg": request.StatusMsg,
}).Infof("Producing to topic '%s'", t.OutputPresenceTopic)
output := &api.OutputPresence{
UserID: request.UserID,
Status: request.Status,
StatusMsg: request.StatusMsg,
Timestamp: request.Timestamp,
}
js, err := json.Marshal(output)
if err != nil {
return err
}
m := &sarama.ProducerMessage{
Topic: t.OutputPresenceTopic,
Key: sarama.StringEncoder(request.UserID),
Value: sarama.ByteEncoder(js),
}
_, _, err = t.Producer.SendMessage(m)
return err
}