Implement api.TypingServerInputAPI

This commit is contained in:
Anant Prakash 2018-07-31 19:28:42 +05:30
parent 48ca0d963f
commit cc6c1f7f7e
No known key found for this signature in database
GPG key ID: C5D399F626523045
2 changed files with 127 additions and 0 deletions

View file

@ -0,0 +1,31 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package api
// OutputTypingEvent is an entry in typing server output kafka log.
type OutputTypingEvent struct {
// The Event for the typing edu event.
Event TypingEvent `json:"event"`
}
// TypingEvent represents a matrix edu event of type 'm.typing'.
type TypingEvent struct {
Type string `json:"type"`
RoomID string `json:"room_id"`
Content TypingEventContent `json:"content"`
}
// TypingEventContent for TypingEvent
type TypingEventContent struct {
UserIDs []string `json:"user_ids"`
}

View file

@ -0,0 +1,96 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package input
import (
"context"
"encoding/json"
"net/http"
"time"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/typingserver/api"
"github.com/matrix-org/dendrite/typingserver/cache"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"gopkg.in/Shopify/sarama.v1"
)
// TypingServerInputAPI implements api.TypingServerInputAPI
type TypingServerInputAPI struct {
// Cache to store the current typing members in each room.
Cache *cache.TypingCache
// The kafka topic to output new typing events to.
OutputTypingEventTopic string
// kafka producer
Producer sarama.SyncProducer
}
// InputTypingEvent implements api.TypingServerInputAPI
func (t *TypingServerInputAPI) InputTypingEvent(
ctx context.Context,
request *api.InputTypingEventRequest,
response *api.InputTypingEventResponse,
) error {
ite := &request.InputTypingEvent
if ite.Typing {
// user is typing, update our current state of users typing.
expireTime := ite.OriginServerTS.Time().Add(
time.Duration(ite.Timeout) * time.Millisecond,
)
t.Cache.AddTypingUser(ite.UserID, ite.RoomID, &expireTime)
} else {
t.Cache.RemoveUser(ite.UserID, ite.RoomID)
}
return t.sendUpdateForRoom(ite.RoomID)
}
func (t *TypingServerInputAPI) sendUpdateForRoom(roomID string) error {
userIDs := t.Cache.GetTypingUsers(roomID)
event := &api.TypingEvent{
Type: gomatrixserverlib.MTyping,
RoomID: roomID,
Content: api.TypingEventContent{UserIDs: userIDs},
}
eventJSON, err := json.Marshal(api.OutputTypingEvent{Event: *event})
if err != nil {
return err
}
m := &sarama.ProducerMessage{
Topic: string(t.OutputTypingEventTopic),
Key: sarama.StringEncoder(roomID),
Value: sarama.ByteEncoder(eventJSON),
}
_, _, err = t.Producer.SendMessage(m)
return err
}
// SetupHTTP adds the TypingServerInputAPI handlers to the http.ServeMux.
func (t *TypingServerInputAPI) SetupHTTP(servMux *http.ServeMux) {
servMux.Handle(api.TypingServerInputTypingEventPath,
common.MakeInternalAPI("inputTypingEvents", func(req *http.Request) util.JSONResponse {
var request api.InputTypingEventRequest
var response api.InputTypingEventResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := t.InputTypingEvent(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
}