Implement typing over federation (#949)

Also fix a pet peeve of mine: not putting units on things!!!

Manually tested on p2p and works well, with some fudge factor delay.
This commit is contained in:
Kegsay 2020-03-30 16:40:28 +01:00 committed by GitHub
parent 11a8059bba
commit 8fbe9f4078
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 75 additions and 25 deletions

View file

@ -20,7 +20,7 @@ import (
"github.com/matrix-org/gomatrixserverlib"
)
// EDUServerProducer produces events for the typing server to consume
// EDUServerProducer produces events for the EDU server to consume
type EDUServerProducer struct {
InputAPI api.EDUServerInputAPI
}
@ -35,13 +35,13 @@ func NewEDUServerProducer(inputAPI api.EDUServerInputAPI) *EDUServerProducer {
// SendTyping sends a typing event to EDU server
func (p *EDUServerProducer) SendTyping(
ctx context.Context, userID, roomID string,
typing bool, timeout int64,
typing bool, timeoutMS int64,
) error {
requestData := api.InputTypingEvent{
UserID: userID,
RoomID: roomID,
Typing: typing,
Timeout: timeout,
TimeoutMS: timeoutMS,
OriginServerTS: gomatrixserverlib.AsTimestamp(time.Now()),
}

View file

@ -15,8 +15,11 @@
package main
import (
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/common/keydb"
"github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/federationapi"
)
@ -34,10 +37,12 @@ func main() {
alias, input, query := base.CreateHTTPRoomserverAPIs()
asQuery := base.CreateHTTPAppServiceAPIs()
eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New())
eduProducer := producers.NewEDUServerProducer(eduInputAPI)
federationapi.SetupFederationAPIComponent(
base, accountDB, deviceDB, federation, &keyRing,
alias, input, query, asQuery, federationSender,
alias, input, query, asQuery, federationSender, eduProducer,
)
base.SetupAndServeHTTP(string(base.Cfg.Bind.FederationAPI), string(base.Cfg.Listen.FederationAPI))

View file

@ -20,6 +20,7 @@ import (
"github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/clientapi"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/common/keydb"
@ -67,7 +68,8 @@ func main() {
federation, &keyRing, alias, input, query,
eduInputAPI, asQuery, transactions.New(), fedSenderAPI,
)
federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI)
eduProducer := producers.NewEDUServerProducer(eduInputAPI)
federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI, eduProducer)
mediaapi.SetupMediaAPIComponent(base, deviceDB)
publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB, query, federation, nil)
syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query, federation, cfg)

View file

@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/clientapi"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/common/config"
@ -133,7 +134,8 @@ func main() {
federation, &keyRing, alias, input, query,
eduInputAPI, asQuery, transactions.New(), fedSenderAPI,
)
federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI)
eduProducer := producers.NewEDUServerProducer(eduInputAPI)
federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI, eduProducer)
mediaapi.SetupMediaAPIComponent(base, deviceDB)
publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB, query, federation, p2pPublicRoomProvider)
syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query, federation, cfg)

View file

@ -30,8 +30,8 @@ type InputTypingEvent struct {
RoomID string `json:"room_id"`
// Typing is true if the user is typing, false if they have stopped.
Typing bool `json:"typing"`
// Timeout is the interval for which the user should be marked as typing.
Timeout int64 `json:"timeout"`
// Timeout is the interval in milliseconds for which the user should be marked as typing.
TimeoutMS int64 `json:"timeout"`
// OriginServerTS when the server received the update.
OriginServerTS gomatrixserverlib.Timestamp `json:"origin_server_ts"`
}

View file

@ -46,7 +46,7 @@ func (t *EDUServerInputAPI) InputTypingEvent(
if ite.Typing {
// user is typing, update our current state of users typing.
expireTime := ite.OriginServerTS.Time().Add(
time.Duration(ite.Timeout) * time.Millisecond,
time.Duration(ite.TimeoutMS) * time.Millisecond,
)
t.Cache.AddTypingUser(ite.UserID, ite.RoomID, &expireTime)
} else {
@ -69,7 +69,7 @@ func (t *EDUServerInputAPI) sendEvent(ite *api.InputTypingEvent) error {
if ev.Typing {
expireTime := ite.OriginServerTS.Time().Add(
time.Duration(ite.Timeout) * time.Millisecond,
time.Duration(ite.TimeoutMS) * time.Millisecond,
)
ote.ExpireTime = &expireTime
}

View file

@ -41,12 +41,13 @@ func SetupFederationAPIComponent(
queryAPI roomserverAPI.RoomserverQueryAPI,
asAPI appserviceAPI.AppServiceQueryAPI,
federationSenderAPI federationSenderAPI.FederationSenderQueryAPI,
eduProducer *producers.EDUServerProducer,
) {
roomserverProducer := producers.NewRoomserverProducer(inputAPI, queryAPI)
routing.Setup(
base.APIMux, base.Cfg, queryAPI, aliasAPI, asAPI,
roomserverProducer, federationSenderAPI, *keyRing,
roomserverProducer, eduProducer, federationSenderAPI, *keyRing,
federation, accountsDB, deviceDB,
)
}

View file

@ -48,6 +48,7 @@ func Setup(
aliasAPI roomserverAPI.RoomserverAliasAPI,
asAPI appserviceAPI.AppServiceQueryAPI,
producer *producers.RoomserverProducer,
eduProducer *producers.EDUServerProducer,
federationSenderAPI federationSenderAPI.FederationSenderQueryAPI,
keys gomatrixserverlib.KeyRing,
federation *gomatrixserverlib.FederationClient,
@ -79,7 +80,7 @@ func Setup(
}
return Send(
httpReq, request, gomatrixserverlib.TransactionID(vars["txnID"]),
cfg, query, producer, keys, federation,
cfg, query, producer, eduProducer, keys, federation,
)
},
)).Methods(http.MethodPut, http.MethodOptions)

View file

@ -36,6 +36,7 @@ func Send(
cfg *config.Dendrite,
query api.RoomserverQueryAPI,
producer *producers.RoomserverProducer,
eduProducer *producers.EDUServerProducer,
keys gomatrixserverlib.KeyRing,
federation *gomatrixserverlib.FederationClient,
) util.JSONResponse {
@ -43,13 +44,14 @@ func Send(
context: httpReq.Context(),
query: query,
producer: producer,
eduProducer: eduProducer,
keys: keys,
federation: federation,
}
var txnEvents struct {
PDUs []json.RawMessage `json:"pdus"`
EDUs []json.RawMessage `json:"edus"`
EDUs []gomatrixserverlib.EDU `json:"edus"`
}
if err := json.Unmarshal(request.Content(), &txnEvents); err != nil {
@ -59,7 +61,9 @@ func Send(
}
}
// TODO: Really we should have a function to convert FederationRequest to txnReq
t.PDUs = txnEvents.PDUs
t.EDUs = txnEvents.EDUs
t.Origin = request.Origin()
t.TransactionID = txnID
t.Destination = cfg.Matrix.ServerName
@ -83,6 +87,7 @@ type txnReq struct {
context context.Context
query api.RoomserverQueryAPI
producer *producers.RoomserverProducer
eduProducer *producers.EDUServerProducer
keys gomatrixserverlib.KeyRing
federation *gomatrixserverlib.FederationClient
}
@ -152,7 +157,7 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) {
}
}
// TODO: Process the EDUs.
t.processEDUs(t.EDUs)
util.GetLogger(t.context).Infof("Processed %d PDUs from transaction %q", len(results), t.TransactionID)
return &gomatrixserverlib.RespSend{PDUs: results}, nil
}
@ -163,6 +168,29 @@ type unknownRoomError struct {
func (e unknownRoomError) Error() string { return fmt.Sprintf("unknown room %q", e.roomID) }
func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) {
for _, e := range edus {
switch e.Type {
case gomatrixserverlib.MTyping:
// https://matrix.org/docs/spec/server_server/latest#typing-notifications
var typingPayload struct {
RoomID string `json:"room_id"`
UserID string `json:"user_id"`
Typing bool `json:"typing"`
}
if err := json.Unmarshal(e.Content, &typingPayload); err != nil {
util.GetLogger(t.context).WithError(err).Error("Failed to unmarshal typing event")
continue
}
if err := t.eduProducer.SendTyping(t.context, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil {
util.GetLogger(t.context).WithError(err).Error("Failed to send typing event to edu server")
}
default:
util.GetLogger(t.context).WithField("type", e.Type).Warn("unhandled edu")
}
}
}
func (t *txnReq) processEvent(e gomatrixserverlib.Event) error {
prevEventIDs := e.PrevEventIDs()

View file

@ -73,6 +73,17 @@ func (t *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error
return nil
}
// only send typing events which originated from us
_, typingServerName, err := gomatrixserverlib.SplitID('@', ote.Event.UserID)
if err != nil {
log.WithError(err).WithField("user_id", ote.Event.UserID).Error("Failed to extract domain from typing sender")
return nil
}
if typingServerName != t.ServerName {
log.WithField("other_server", typingServerName).Info("Suppressing typing notif: originated elsewhere")
return nil
}
joined, err := t.db.GetJoinedHosts(context.TODO(), ote.Event.RoomID)
if err != nil {
return err