Merge remote-tracking branch 'matrix-org/master' into travis/joined_rooms

This commit is contained in:
Travis Ralston 2018-10-15 21:43:29 -06:00
commit 0ad58b5a45
67 changed files with 1801 additions and 436 deletions

View file

@ -3,6 +3,7 @@ go:
- 1.8.x
- 1.9.x
- 1.10.x
- 1.11.x
env:
- TEST_SUITE="lint"

View file

@ -86,6 +86,7 @@ kafka:
topics:
output_room_event: roomserverOutput
output_client_data: clientapiOutput
output_typing_event: typingServerOutput
user_updates: userUpdates
# The postgres connection configs for connecting to the databases e.g a postgres:// URI
@ -114,6 +115,7 @@ listen:
public_rooms_api: "localhost:7775"
federation_sender: "localhost:7776"
appservice_api: "localhost:7777"
typing_server: "localhost:7778"
# The configuration for tracing the dendrite components.
tracing:

View file

@ -18,16 +18,76 @@
package api
import (
"context"
"database/sql"
"errors"
"net/http"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/gomatrixserverlib"
commonHTTP "github.com/matrix-org/dendrite/common/http"
opentracing "github.com/opentracing/opentracing-go"
)
// RoomAliasExistsRequest is a request to an application service
// about whether a room alias exists
type RoomAliasExistsRequest struct {
// Alias we want to lookup
Alias string `json:"alias"`
}
// RoomAliasExistsResponse is a response from an application service
// about whether a room alias exists
type RoomAliasExistsResponse struct {
AliasExists bool `json:"exists"`
}
// UserIDExistsRequest is a request to an application service about whether a
// user ID exists
type UserIDExistsRequest struct {
// UserID we want to lookup
UserID string `json:"user_id"`
}
// UserIDExistsRequestAccessToken is a request to an application service
// about whether a user ID exists. Includes an access token
type UserIDExistsRequestAccessToken struct {
// UserID we want to lookup
UserID string `json:"user_id"`
AccessToken string `json:"access_token"`
}
// UserIDExistsResponse is a response from an application service about
// whether a user ID exists
type UserIDExistsResponse struct {
UserIDExists bool `json:"exists"`
}
// AppServiceQueryAPI is used to query user and room alias data from application
// services
type AppServiceQueryAPI interface {
// TODO: Check whether a room alias exists within any application service namespaces
// TODO: QueryUserIDExists
// Check whether a room alias exists within any application service namespaces
RoomAliasExists(
ctx context.Context,
req *RoomAliasExistsRequest,
resp *RoomAliasExistsResponse,
) error
// Check whether a user ID exists within any application service namespaces
UserIDExists(
ctx context.Context,
req *UserIDExistsRequest,
resp *UserIDExistsResponse,
) error
}
// AppServiceRoomAliasExistsPath is the HTTP path for the RoomAliasExists API
const AppServiceRoomAliasExistsPath = "/api/appservice/RoomAliasExists"
// AppServiceUserIDExistsPath is the HTTP path for the UserIDExists API
const AppServiceUserIDExistsPath = "/api/appservice/UserIDExists"
// httpAppServiceQueryAPI contains the URL to an appservice query API and a
// reference to a httpClient used to reach it
type httpAppServiceQueryAPI struct {
@ -47,3 +107,72 @@ func NewAppServiceQueryAPIHTTP(
}
return &httpAppServiceQueryAPI{appserviceURL, httpClient}
}
// RoomAliasExists implements AppServiceQueryAPI
func (h *httpAppServiceQueryAPI) RoomAliasExists(
ctx context.Context,
request *RoomAliasExistsRequest,
response *RoomAliasExistsResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "appserviceRoomAliasExists")
defer span.Finish()
apiURL := h.appserviceURL + AppServiceRoomAliasExistsPath
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// UserIDExists implements AppServiceQueryAPI
func (h *httpAppServiceQueryAPI) UserIDExists(
ctx context.Context,
request *UserIDExistsRequest,
response *UserIDExistsResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "appserviceUserIDExists")
defer span.Finish()
apiURL := h.appserviceURL + AppServiceUserIDExistsPath
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// RetreiveUserProfile is a wrapper that queries both the local database and
// application services for a given user's profile
func RetreiveUserProfile(
ctx context.Context,
userID string,
asAPI AppServiceQueryAPI,
accountDB *accounts.Database,
) (*authtypes.Profile, error) {
localpart, _, err := gomatrixserverlib.SplitID('@', userID)
if err != nil {
return nil, err
}
// Try to query the user from the local database
profile, err := accountDB.GetProfileByLocalpart(ctx, localpart)
if err != nil && err != sql.ErrNoRows {
return nil, err
} else if profile != nil {
return profile, nil
}
// Query the appservice component for the existence of an AS user
userReq := UserIDExistsRequest{UserID: userID}
var userResp UserIDExistsResponse
if err = asAPI.UserIDExists(ctx, &userReq, &userResp); err != nil {
return nil, err
}
// If no user exists, return
if !userResp.UserIDExists {
return nil, errors.New("no known profile for given user ID")
}
// Try to query the user from the local database again
profile, err = accountDB.GetProfileByLocalpart(ctx, localpart)
if err != nil {
return nil, err
}
// profile should not be nil at this point
return profile, nil
}

View file

@ -74,15 +74,13 @@ func SetupAppServiceAPIComponent(
}
}
// Create a HTTP client that this component will use for all outbound and
// inbound requests (inbound only for the internal API)
httpClient := &http.Client{
Timeout: time.Second * 30,
}
// Create appserivce query API with an HTTP client that will be used for all
// outbound and inbound requests (inbound only for the internal API)
appserviceQueryAPI := query.AppServiceQueryAPI{
HTTPClient: httpClient,
Cfg: base.Cfg,
HTTPClient: &http.Client{
Timeout: time.Second * 30,
},
Cfg: base.Cfg,
}
appserviceQueryAPI.SetupHTTP(http.DefaultServeMux)
@ -92,7 +90,7 @@ func SetupAppServiceAPIComponent(
roomserverQueryAPI, roomserverAliasAPI, workerStates,
)
if err := consumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start app service roomserver consumer")
logrus.WithError(err).Panicf("failed to start appservice roomserver consumer")
}
// Create application service transaction workers

View file

@ -185,6 +185,7 @@ func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Cont
return false
}
// Check Room ID and Sender of the event
if appservice.IsInterestedInUserID(event.Sender()) ||
appservice.IsInterestedInRoomID(event.RoomID()) {
return true

View file

@ -17,18 +17,198 @@
package query
import (
"context"
"encoding/json"
"net/http"
"net/url"
"time"
"github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/util"
opentracing "github.com/opentracing/opentracing-go"
log "github.com/sirupsen/logrus"
)
const roomAliasExistsPath = "/rooms/"
const userIDExistsPath = "/users/"
// AppServiceQueryAPI is an implementation of api.AppServiceQueryAPI
type AppServiceQueryAPI struct {
HTTPClient *http.Client
Cfg *config.Dendrite
}
// RoomAliasExists performs a request to '/room/{roomAlias}' on all known
// handling application services until one admits to owning the room
func (a *AppServiceQueryAPI) RoomAliasExists(
ctx context.Context,
request *api.RoomAliasExistsRequest,
response *api.RoomAliasExistsResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "ApplicationServiceRoomAlias")
defer span.Finish()
// Create an HTTP client if one does not already exist
if a.HTTPClient == nil {
a.HTTPClient = makeHTTPClient()
}
// Determine which application service should handle this request
for _, appservice := range a.Cfg.Derived.ApplicationServices {
if appservice.URL != "" && appservice.IsInterestedInRoomAlias(request.Alias) {
// The full path to the rooms API, includes hs token
URL, err := url.Parse(appservice.URL + roomAliasExistsPath)
URL.Path += request.Alias
apiURL := URL.String() + "?access_token=" + appservice.HSToken
// Send a request to each application service. If one responds that it has
// created the room, immediately return.
req, err := http.NewRequest(http.MethodGet, apiURL, nil)
if err != nil {
return err
}
req = req.WithContext(ctx)
resp, err := a.HTTPClient.Do(req)
if resp != nil {
defer func() {
err = resp.Body.Close()
if err != nil {
log.WithFields(log.Fields{
"appservice_id": appservice.ID,
"status_code": resp.StatusCode,
}).WithError(err).Error("Unable to close application service response body")
}
}()
}
if err != nil {
log.WithError(err).Errorf("Issue querying room alias on application service %s", appservice.ID)
return err
}
switch resp.StatusCode {
case http.StatusOK:
// OK received from appservice. Room exists
response.AliasExists = true
return nil
case http.StatusNotFound:
// Room does not exist
default:
// Application service reported an error. Warn
log.WithFields(log.Fields{
"appservice_id": appservice.ID,
"status_code": resp.StatusCode,
}).Warn("Application service responded with non-OK status code")
}
}
}
response.AliasExists = false
return nil
}
// UserIDExists performs a request to '/users/{userID}' on all known
// handling application services until one admits to owning the user ID
func (a *AppServiceQueryAPI) UserIDExists(
ctx context.Context,
request *api.UserIDExistsRequest,
response *api.UserIDExistsResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "ApplicationServiceUserID")
defer span.Finish()
// Create an HTTP client if one does not already exist
if a.HTTPClient == nil {
a.HTTPClient = makeHTTPClient()
}
// Determine which application service should handle this request
for _, appservice := range a.Cfg.Derived.ApplicationServices {
if appservice.URL != "" && appservice.IsInterestedInUserID(request.UserID) {
// The full path to the rooms API, includes hs token
URL, err := url.Parse(appservice.URL + userIDExistsPath)
URL.Path += request.UserID
apiURL := URL.String() + "?access_token=" + appservice.HSToken
// Send a request to each application service. If one responds that it has
// created the user, immediately return.
req, err := http.NewRequest(http.MethodGet, apiURL, nil)
if err != nil {
return err
}
resp, err := a.HTTPClient.Do(req.WithContext(ctx))
if resp != nil {
defer func() {
err = resp.Body.Close()
if err != nil {
log.WithFields(log.Fields{
"appservice_id": appservice.ID,
"status_code": resp.StatusCode,
}).Error("Unable to close application service response body")
}
}()
}
if err != nil {
log.WithFields(log.Fields{
"appservice_id": appservice.ID,
}).WithError(err).Error("issue querying user ID on application service")
return err
}
if resp.StatusCode == http.StatusOK {
// StatusOK received from appservice. User ID exists
response.UserIDExists = true
return nil
}
// Log non OK
log.WithFields(log.Fields{
"appservice_id": appservice.ID,
"status_code": resp.StatusCode,
}).Warn("application service responded with non-OK status code")
}
}
response.UserIDExists = false
return nil
}
// makeHTTPClient creates an HTTP client with certain options that will be used for all query requests to application services
func makeHTTPClient() *http.Client {
return &http.Client{
Timeout: time.Second * 30,
}
}
// SetupHTTP adds the AppServiceQueryPAI handlers to the http.ServeMux. This
// handles and muxes incoming api requests the to internal AppServiceQueryAPI.
func (a *AppServiceQueryAPI) SetupHTTP(servMux *http.ServeMux) {
servMux.Handle(
api.AppServiceRoomAliasExistsPath,
common.MakeInternalAPI("appserviceRoomAliasExists", func(req *http.Request) util.JSONResponse {
var request api.RoomAliasExistsRequest
var response api.RoomAliasExistsResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err)
}
if err := a.RoomAliasExists(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
servMux.Handle(
api.AppServiceUserIDExistsPath,
common.MakeInternalAPI("appserviceUserIDExists", func(req *http.Request) util.JSONResponse {
var request api.UserIDExistsRequest
var response api.UserIDExistsResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err)
}
if err := a.UserIDExists(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
}

View file

@ -17,7 +17,6 @@ package workers
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"math"
@ -68,12 +67,6 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
// Create a HTTP client for sending requests to app services
client := &http.Client{
Timeout: transactionTimeout,
// TODO: Verify certificates
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true, // nolint: gas
},
},
}
// Initial check for any leftover events to send from last time

View file

@ -72,7 +72,7 @@ func VerifyUserFromRequest(
}
// Try to find the Application Service user
token, err := extractAccessToken(req)
token, err := ExtractAccessToken(req)
if err != nil {
return nil, &util.JSONResponse{
Code: http.StatusUnauthorized,
@ -150,7 +150,7 @@ func verifyUserParameters(req *http.Request) *util.JSONResponse {
// and returns the device it corresponds to. Returns resErr (an error response which can be
// sent to the client) if the token is invalid or there was a problem querying the database.
func verifyAccessToken(req *http.Request, deviceDB DeviceDatabase) (device *authtypes.Device, resErr *util.JSONResponse) {
token, err := extractAccessToken(req)
token, err := ExtractAccessToken(req)
if err != nil {
resErr = &util.JSONResponse{
Code: http.StatusUnauthorized,
@ -184,9 +184,9 @@ func GenerateAccessToken() (string, error) {
return base64.RawURLEncoding.EncodeToString(b), nil
}
// extractAccessToken from a request, or return an error detailing what went wrong. The
// ExtractAccessToken from a request, or return an error detailing what went wrong. The
// error message MUST be human-readable and comprehensible to the client.
func extractAccessToken(req *http.Request) (string, error) {
func ExtractAccessToken(req *http.Request) (string, error) {
// cf https://github.com/matrix-org/synapse/blob/v0.19.2/synapse/api/auth.py#L631
authBearer := req.Header.Get("Authorization")
queryToken := req.URL.Query().Get("access_token")

View file

@ -48,13 +48,17 @@ const insertMembershipSQL = `
const selectMembershipsByLocalpartSQL = "" +
"SELECT room_id, event_id FROM account_memberships WHERE localpart = $1"
const selectMembershipInRoomByLocalpartSQL = "" +
"SELECT event_id FROM account_memberships WHERE localpart = $1 AND room_id = $2"
const deleteMembershipsByEventIDsSQL = "" +
"DELETE FROM account_memberships WHERE event_id = ANY($1)"
type membershipStatements struct {
deleteMembershipsByEventIDsStmt *sql.Stmt
insertMembershipStmt *sql.Stmt
selectMembershipsByLocalpartStmt *sql.Stmt
deleteMembershipsByEventIDsStmt *sql.Stmt
insertMembershipStmt *sql.Stmt
selectMembershipInRoomByLocalpartStmt *sql.Stmt
selectMembershipsByLocalpartStmt *sql.Stmt
}
func (s *membershipStatements) prepare(db *sql.DB) (err error) {
@ -68,6 +72,9 @@ func (s *membershipStatements) prepare(db *sql.DB) (err error) {
if s.insertMembershipStmt, err = db.Prepare(insertMembershipSQL); err != nil {
return
}
if s.selectMembershipInRoomByLocalpartStmt, err = db.Prepare(selectMembershipInRoomByLocalpartSQL); err != nil {
return
}
if s.selectMembershipsByLocalpartStmt, err = db.Prepare(selectMembershipsByLocalpartSQL); err != nil {
return
}
@ -90,6 +97,16 @@ func (s *membershipStatements) deleteMembershipsByEventIDs(
return
}
func (s *membershipStatements) selectMembershipInRoomByLocalpart(
ctx context.Context, localpart, roomID string,
) (authtypes.Membership, error) {
membership := authtypes.Membership{Localpart: localpart, RoomID: roomID}
stmt := s.selectMembershipInRoomByLocalpartStmt
err := stmt.QueryRowContext(ctx, localpart, roomID).Scan(&membership.EventID)
return membership, err
}
func (s *membershipStatements) selectMembershipsByLocalpart(
ctx context.Context, localpart string,
) (memberships []authtypes.Membership, err error) {

View file

@ -143,9 +143,8 @@ func (d *Database) CreateAccount(
}
// SaveMembership saves the user matching a given localpart as a member of a given
// room. It also stores the ID of the membership event and a flag on whether the user
// is still in the room.
// If a membership already exists between the user and the room, or of the
// room. It also stores the ID of the membership event.
// If a membership already exists between the user and the room, or if the
// insert fails, returns the SQL error
func (d *Database) saveMembership(
ctx context.Context, txn *sql.Tx, localpart, roomID, eventID string,
@ -153,8 +152,8 @@ func (d *Database) saveMembership(
return d.memberships.insertMembership(ctx, txn, localpart, roomID, eventID)
}
// removeMembershipsByEventIDs removes the memberships of which the `join` membership
// event ID is included in a given array of events IDs
// removeMembershipsByEventIDs removes the memberships corresponding to the
// `join` membership events IDs in the eventIDs slice.
// If the removal fails, or if there is no membership to remove, returns an error
func (d *Database) removeMembershipsByEventIDs(
ctx context.Context, txn *sql.Tx, eventIDs []string,
@ -185,6 +184,16 @@ func (d *Database) UpdateMemberships(
})
}
// GetMembershipInRoomByLocalpart returns the membership for an user
// matching the given localpart if he is a member of the room matching roomID,
// if not sql.ErrNoRows is returned.
// If there was an issue during the retrieval, returns the SQL error
func (d *Database) GetMembershipInRoomByLocalpart(
ctx context.Context, localpart, roomID string,
) (authtypes.Membership, error) {
return d.memberships.selectMembershipInRoomByLocalpart(ctx, localpart, roomID)
}
// GetMembershipsByLocalpart returns an array containing the memberships for all
// the rooms a user matching a given localpart is a member of
// If no membership match the given localpart, returns an empty array
@ -195,13 +204,9 @@ func (d *Database) GetMembershipsByLocalpart(
return d.memberships.selectMembershipsByLocalpart(ctx, localpart)
}
// newMembership will save a new membership in the database, with a flag on whether
// the user is still in the room. This flag is set to true if the given state
// event is a "join" membership event and false if the event is a "leave" or "ban"
// membership. If the event isn't a m.room.member event with one of these three
// values, does nothing.
// If the event isn't a "join" membership event, does nothing
// If an error occurred, returns it
// newMembership saves a new membership in the database.
// If the event isn't a valid m.room.member event with type `join`, does nothing.
// If an error occurred, returns the SQL error
func (d *Database) newMembership(
ctx context.Context, txn *sql.Tx, ev gomatrixserverlib.Event,
) error {

View file

@ -15,6 +15,7 @@
package clientapi
import (
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
"github.com/matrix-org/dendrite/clientapi/consumers"
@ -23,6 +24,7 @@ import (
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/common/transactions"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
typingServerAPI "github.com/matrix-org/dendrite/typingserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
)
@ -38,9 +40,12 @@ func SetupClientAPIComponent(
aliasAPI roomserverAPI.RoomserverAliasAPI,
inputAPI roomserverAPI.RoomserverInputAPI,
queryAPI roomserverAPI.RoomserverQueryAPI,
typingInputAPI typingServerAPI.TypingServerInputAPI,
asAPI appserviceAPI.AppServiceQueryAPI,
transactionsCache *transactions.Cache,
) {
roomserverProducer := producers.NewRoomserverProducer(inputAPI)
typingProducer := producers.NewTypingServerProducer(typingInputAPI)
userUpdateProducer := &producers.UserUpdateProducer{
Producer: base.KafkaProducer,
@ -60,8 +65,8 @@ func SetupClientAPIComponent(
}
routing.Setup(
base.APIMux, *base.Cfg, roomserverProducer, queryAPI, aliasAPI,
base.APIMux, *base.Cfg, roomserverProducer, queryAPI, aliasAPI, asAPI,
accountsDB, deviceDB, federation, *keyRing, userUpdateProducer,
syncProducer, transactionsCache,
syncProducer, typingProducer, transactionsCache,
)
}

View file

@ -0,0 +1,39 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package httputil
import (
"fmt"
"net/http"
"strconv"
"time"
)
// ParseTSParam takes a req (typically from an application service) and parses a Time object
// from the req if it exists in the query parameters. If it doesn't exist, the
// current time is returned.
func ParseTSParam(req *http.Request) (time.Time, error) {
// Use the ts parameter's value for event time if present
tsStr := req.URL.Query().Get("ts")
if tsStr == "" {
return time.Now(), nil
}
// The parameter exists, parse into a Time object
ts, err := strconv.ParseInt(tsStr, 10, 64)
if err != nil {
return time.Time{}, fmt.Errorf("Param 'ts' is no valid int (%s)", err.Error())
}
return time.Unix(ts/1000, 0), nil
}

View file

@ -28,7 +28,7 @@ type MatrixError struct {
Err string `json:"error"`
}
func (e *MatrixError) Error() string {
func (e MatrixError) Error() string {
return fmt.Sprintf("%s: %s", e.ErrCode, e.Err)
}

View file

@ -0,0 +1,54 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package producers
import (
"context"
"time"
"github.com/matrix-org/dendrite/typingserver/api"
"github.com/matrix-org/gomatrixserverlib"
)
// TypingServerProducer produces events for the typing server to consume
type TypingServerProducer struct {
InputAPI api.TypingServerInputAPI
}
// NewTypingServerProducer creates a new TypingServerProducer
func NewTypingServerProducer(inputAPI api.TypingServerInputAPI) *TypingServerProducer {
return &TypingServerProducer{
InputAPI: inputAPI,
}
}
// Send typing event to typing server
func (p *TypingServerProducer) Send(
ctx context.Context, userID, roomID string,
typing bool, timeout int64,
) error {
requestData := api.InputTypingEvent{
UserID: userID,
RoomID: roomID,
Typing: typing,
Timeout: timeout,
OriginServerTS: gomatrixserverlib.AsTimestamp(time.Now()),
}
var response api.InputTypingEventResponse
err := p.InputAPI.InputTypingEvent(
ctx, &api.InputTypingEventRequest{InputTypingEvent: requestData}, &response,
)
return err
}

View file

@ -18,8 +18,10 @@ import (
"fmt"
"net/http"
"strings"
"time"
"github.com/matrix-org/dendrite/roomserver/api"
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
@ -70,7 +72,7 @@ func (r createRoomRequest) Validate() *util.JSONResponse {
if strings.ContainsAny(r.RoomAliasName, whitespace+":") {
return &util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.BadJSON("room_alias_name cannot contain whitespace"),
JSON: jsonerror.BadJSON("room_alias_name cannot contain whitespace or ':'"),
}
}
for _, userID := range r.Invite {
@ -115,12 +117,13 @@ type fledglingEvent struct {
func CreateRoom(
req *http.Request, device *authtypes.Device,
cfg config.Dendrite, producer *producers.RoomserverProducer,
accountDB *accounts.Database, aliasAPI api.RoomserverAliasAPI,
accountDB *accounts.Database, aliasAPI roomserverAPI.RoomserverAliasAPI,
asAPI appserviceAPI.AppServiceQueryAPI,
) util.JSONResponse {
// TODO (#267): Check room ID doesn't clash with an existing one, and we
// probably shouldn't be using pseudo-random strings, maybe GUIDs?
roomID := fmt.Sprintf("!%s:%s", util.RandomString(16), cfg.Matrix.ServerName)
return createRoom(req, device, cfg, roomID, producer, accountDB, aliasAPI)
return createRoom(req, device, cfg, roomID, producer, accountDB, aliasAPI, asAPI)
}
// createRoom implements /createRoom
@ -128,7 +131,8 @@ func CreateRoom(
func createRoom(
req *http.Request, device *authtypes.Device,
cfg config.Dendrite, roomID string, producer *producers.RoomserverProducer,
accountDB *accounts.Database, aliasAPI api.RoomserverAliasAPI,
accountDB *accounts.Database, aliasAPI roomserverAPI.RoomserverAliasAPI,
asAPI appserviceAPI.AppServiceQueryAPI,
) util.JSONResponse {
logger := util.GetLogger(req.Context())
userID := device.UserID
@ -143,8 +147,14 @@ func createRoom(
return *resErr
}
evTime, err := httputil.ParseTSParam(req)
if err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.InvalidArgumentValue(err.Error()),
}
}
// TODO: visibility/presets/raw initial state/creation content
// TODO: Create room alias association
// Make sure this doesn't fall into an application service's namespace though!
@ -153,12 +163,7 @@ func createRoom(
"roomID": roomID,
}).Info("Creating new room")
localpart, _, err := gomatrixserverlib.SplitID('@', userID)
if err != nil {
return httputil.LogThenError(req, err)
}
profile, err := accountDB.GetProfileByLocalpart(req.Context(), localpart)
profile, err := appserviceAPI.RetreiveUserProfile(req.Context(), userID, asAPI, accountDB)
if err != nil {
return httputil.LogThenError(req, err)
}
@ -249,7 +254,7 @@ func createRoom(
builder.PrevEvents = []gomatrixserverlib.EventReference{builtEvents[i-1].EventReference()}
}
var ev *gomatrixserverlib.Event
ev, err = buildEvent(req, &builder, &authEvents, cfg)
ev, err = buildEvent(&builder, &authEvents, cfg, evTime)
if err != nil {
return httputil.LogThenError(req, err)
}
@ -279,13 +284,13 @@ func createRoom(
if r.RoomAliasName != "" {
roomAlias = fmt.Sprintf("#%s:%s", r.RoomAliasName, cfg.Matrix.ServerName)
aliasReq := api.SetRoomAliasRequest{
aliasReq := roomserverAPI.SetRoomAliasRequest{
Alias: roomAlias,
RoomID: roomID,
UserID: userID,
}
var aliasResp api.SetRoomAliasResponse
var aliasResp roomserverAPI.SetRoomAliasResponse
err = aliasAPI.SetRoomAlias(req.Context(), &aliasReq, &aliasResp)
if err != nil {
return httputil.LogThenError(req, err)
@ -309,12 +314,11 @@ func createRoom(
// buildEvent fills out auth_events for the builder then builds the event
func buildEvent(
req *http.Request,
builder *gomatrixserverlib.EventBuilder,
provider gomatrixserverlib.AuthEventProvider,
cfg config.Dendrite,
evTime time.Time,
) (*gomatrixserverlib.Event, error) {
eventsNeeded, err := gomatrixserverlib.StateNeededForEventBuilder(builder)
if err != nil {
return nil, err
@ -325,8 +329,7 @@ func buildEvent(
}
builder.AuthEvents = refs
eventID := fmt.Sprintf("$%s:%s", util.RandomString(16), cfg.Matrix.ServerName)
eventTime := common.ParseTSParam(req)
event, err := builder.Build(eventID, eventTime, cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey)
event, err := builder.Build(eventID, evTime, cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey)
if err != nil {
return nil, fmt.Errorf("cannot build event %s : Builder failed to build. %s", builder.Type, err)
}

View file

@ -15,13 +15,14 @@
package routing
import (
"fmt"
"net/http"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/roomserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
@ -33,7 +34,7 @@ func DirectoryRoom(
roomAlias string,
federation *gomatrixserverlib.FederationClient,
cfg *config.Dendrite,
aliasAPI api.RoomserverAliasAPI,
rsAPI roomserverAPI.RoomserverAliasAPI,
) util.JSONResponse {
_, domain, err := gomatrixserverlib.SplitID('#', roomAlias)
if err != nil {
@ -43,49 +44,46 @@ func DirectoryRoom(
}
}
var resp gomatrixserverlib.RespDirectory
if domain == cfg.Matrix.ServerName {
queryReq := api.GetRoomIDForAliasRequest{Alias: roomAlias}
var queryRes api.GetRoomIDForAliasResponse
if err = aliasAPI.GetRoomIDForAlias(req.Context(), &queryReq, &queryRes); err != nil {
// Query the roomserver API to check if the alias exists locally
queryReq := roomserverAPI.GetRoomIDForAliasRequest{Alias: roomAlias}
var queryRes roomserverAPI.GetRoomIDForAliasResponse
if err = rsAPI.GetRoomIDForAlias(req.Context(), &queryReq, &queryRes); err != nil {
return httputil.LogThenError(req, err)
}
// List any roomIDs found associated with this alias
if len(queryRes.RoomID) > 0 {
// TODO: List servers that are aware of this room alias
resp = gomatrixserverlib.RespDirectory{
RoomID: queryRes.RoomID,
Servers: []gomatrixserverlib.ServerName{},
}
} else {
// If the response doesn't contain a non-empty string, return an error
return util.JSONResponse{
Code: http.StatusNotFound,
JSON: jsonerror.NotFound("Room alias " + roomAlias + " not found."),
Code: http.StatusOK,
JSON: queryRes,
}
}
} else {
resp, err = federation.LookupRoomAlias(req.Context(), domain, roomAlias)
// Query the federation for this room alias
resp, err := federation.LookupRoomAlias(req.Context(), domain, roomAlias)
if err != nil {
switch x := err.(type) {
switch err.(type) {
case gomatrix.HTTPError:
if x.Code == http.StatusNotFound {
return util.JSONResponse{
Code: http.StatusNotFound,
JSON: jsonerror.NotFound("Room alias not found"),
}
}
default:
// TODO: Return 502 if the remote server errored.
// TODO: Return 504 if the remote server timed out.
return httputil.LogThenError(req, err)
}
}
if len(resp.RoomID) > 0 {
return util.JSONResponse{
Code: http.StatusOK,
JSON: resp,
}
// TODO: Return 502 if the remote server errored.
// TODO: Return 504 if the remote server timed out.
return httputil.LogThenError(req, err)
}
}
return util.JSONResponse{
Code: http.StatusOK,
JSON: resp,
Code: http.StatusNotFound,
JSON: jsonerror.NotFound(
fmt.Sprintf("Room alias %s not found", roomAlias),
),
}
}
@ -96,7 +94,7 @@ func SetLocalAlias(
device *authtypes.Device,
alias string,
cfg *config.Dendrite,
aliasAPI api.RoomserverAliasAPI,
aliasAPI roomserverAPI.RoomserverAliasAPI,
) util.JSONResponse {
_, domain, err := gomatrixserverlib.SplitID('#', alias)
if err != nil {
@ -138,12 +136,12 @@ func SetLocalAlias(
return *resErr
}
queryReq := api.SetRoomAliasRequest{
queryReq := roomserverAPI.SetRoomAliasRequest{
UserID: device.UserID,
RoomID: r.RoomID,
Alias: alias,
}
var queryRes api.SetRoomAliasResponse
var queryRes roomserverAPI.SetRoomAliasResponse
if err := aliasAPI.SetRoomAlias(req.Context(), &queryReq, &queryRes); err != nil {
return httputil.LogThenError(req, err)
}
@ -167,13 +165,13 @@ func RemoveLocalAlias(
req *http.Request,
device *authtypes.Device,
alias string,
aliasAPI api.RoomserverAliasAPI,
aliasAPI roomserverAPI.RoomserverAliasAPI,
) util.JSONResponse {
queryReq := api.RemoveRoomAliasRequest{
queryReq := roomserverAPI.RemoveRoomAliasRequest{
Alias: alias,
UserID: device.UserID,
}
var queryRes api.RemoveRoomAliasResponse
var queryRes roomserverAPI.RemoveRoomAliasResponse
if err := aliasAPI.RemoveRoomAlias(req.Context(), &queryReq, &queryRes); err != nil {
return httputil.LogThenError(req, err)
}

View file

@ -18,6 +18,7 @@ import (
"fmt"
"net/http"
"strings"
"time"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
@ -51,6 +52,14 @@ func JoinRoomByIDOrAlias(
return *resErr
}
evTime, err := httputil.ParseTSParam(req)
if err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.InvalidArgumentValue(err.Error()),
}
}
localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID)
if err != nil {
return httputil.LogThenError(req, err)
@ -65,7 +74,9 @@ func JoinRoomByIDOrAlias(
content["displayname"] = profile.DisplayName
content["avatar_url"] = profile.AvatarURL
r := joinRoomReq{req, content, device.UserID, cfg, federation, producer, queryAPI, aliasAPI, keyRing}
r := joinRoomReq{
req, evTime, content, device.UserID, cfg, federation, producer, queryAPI, aliasAPI, keyRing,
}
if strings.HasPrefix(roomIDOrAlias, "!") {
return r.joinRoomByID(roomIDOrAlias)
@ -81,6 +92,7 @@ func JoinRoomByIDOrAlias(
type joinRoomReq struct {
req *http.Request
evTime time.Time
content map[string]interface{}
userID string
cfg config.Dendrite
@ -214,7 +226,7 @@ func (r joinRoomReq) joinRoomUsingServers(
}
var queryRes roomserverAPI.QueryLatestEventsAndStateResponse
event, err := common.BuildEvent(r.req, &eb, r.cfg, r.queryAPI, &queryRes)
event, err := common.BuildEvent(r.req.Context(), &eb, r.cfg, r.evTime, r.queryAPI, &queryRes)
if err == nil {
if _, err = r.producer.SendEvents(r.req.Context(), []gomatrixserverlib.Event{*event}, r.cfg.Matrix.ServerName, nil); err != nil {
return httputil.LogThenError(r.req, err)
@ -285,9 +297,8 @@ func (r joinRoomReq) joinRoomUsingServer(roomID string, server gomatrixserverlib
}
eventID := fmt.Sprintf("$%s:%s", util.RandomString(16), r.cfg.Matrix.ServerName)
eventTime := common.ParseTSParam(r.req)
event, err := respMakeJoin.JoinEvent.Build(
eventID, eventTime, r.cfg.Matrix.ServerName, r.cfg.Matrix.KeyID, r.cfg.Matrix.PrivateKey,
eventID, r.evTime, r.cfg.Matrix.ServerName, r.cfg.Matrix.KeyID, r.cfg.Matrix.PrivateKey,
)
if err != nil {
res := httputil.LogThenError(r.req, err)

View file

@ -17,7 +17,10 @@ package routing
import (
"net/http"
"context"
"database/sql"
"github.com/matrix-org/dendrite/clientapi/auth"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
"github.com/matrix-org/dendrite/clientapi/httputil"
@ -41,6 +44,7 @@ type passwordRequest struct {
User string `json:"user"`
Password string `json:"password"`
InitialDisplayName *string `json:"initial_device_display_name"`
DeviceID string `json:"device_id"`
}
type loginResponse struct {
@ -105,10 +109,7 @@ func Login(
httputil.LogThenError(req, err)
}
// TODO: Use the device ID in the request
dev, err := deviceDB.CreateDevice(
req.Context(), acc.Localpart, nil, token, r.InitialDisplayName,
)
dev, err := getDevice(req.Context(), r, deviceDB, acc, localpart, token)
if err != nil {
return util.JSONResponse{
Code: http.StatusInternalServerError,
@ -131,3 +132,21 @@ func Login(
JSON: jsonerror.NotFound("Bad method"),
}
}
// check if device exists else create one
func getDevice(
ctx context.Context,
r passwordRequest,
deviceDB *devices.Database,
acc *authtypes.Account,
localpart, token string,
) (dev *authtypes.Device, err error) {
dev, err = deviceDB.GetDeviceByID(ctx, localpart, r.DeviceID)
if err == sql.ErrNoRows {
// device doesn't exist, create one
dev, err = deviceDB.CreateDevice(
ctx, acc.Localpart, nil, token, r.InitialDisplayName,
)
}
return
}

View file

@ -18,7 +18,9 @@ import (
"context"
"errors"
"net/http"
"time"
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/httputil"
@ -27,7 +29,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/threepid"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/roomserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
@ -40,15 +42,25 @@ var errMissingUserID = errors.New("'user_id' must be supplied")
func SendMembership(
req *http.Request, accountDB *accounts.Database, device *authtypes.Device,
roomID string, membership string, cfg config.Dendrite,
queryAPI api.RoomserverQueryAPI, producer *producers.RoomserverProducer,
queryAPI roomserverAPI.RoomserverQueryAPI, asAPI appserviceAPI.AppServiceQueryAPI,
producer *producers.RoomserverProducer,
) util.JSONResponse {
var body threepid.MembershipRequest
if reqErr := httputil.UnmarshalJSONRequest(req, &body); reqErr != nil {
return *reqErr
}
evTime, err := httputil.ParseTSParam(req)
if err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.InvalidArgumentValue(err.Error()),
}
}
inviteStored, err := threepid.CheckAndProcessInvite(
req, device, &body, cfg, queryAPI, accountDB, producer, membership, roomID,
req.Context(), device, &body, cfg, queryAPI, accountDB, producer,
membership, roomID, evTime,
)
if err == threepid.ErrMissingParameter {
return util.JSONResponse{
@ -80,7 +92,7 @@ func SendMembership(
}
event, err := buildMembershipEvent(
req, body, accountDB, device, membership, roomID, cfg, queryAPI,
req.Context(), body, accountDB, device, membership, roomID, cfg, evTime, queryAPI, asAPI,
)
if err == errMissingUserID {
return util.JSONResponse{
@ -109,17 +121,19 @@ func SendMembership(
}
func buildMembershipEvent(
req *http.Request,
ctx context.Context,
body threepid.MembershipRequest, accountDB *accounts.Database,
device *authtypes.Device, membership string, roomID string, cfg config.Dendrite,
queryAPI api.RoomserverQueryAPI,
device *authtypes.Device,
membership, roomID string,
cfg config.Dendrite, evTime time.Time,
queryAPI roomserverAPI.RoomserverQueryAPI, asAPI appserviceAPI.AppServiceQueryAPI,
) (*gomatrixserverlib.Event, error) {
stateKey, reason, err := getMembershipStateKey(body, device, membership)
if err != nil {
return nil, err
}
profile, err := loadProfile(req.Context(), stateKey, cfg, accountDB)
profile, err := loadProfile(ctx, stateKey, cfg, accountDB, asAPI)
if err != nil {
return nil, err
}
@ -147,7 +161,7 @@ func buildMembershipEvent(
return nil, err
}
return common.BuildEvent(req, &builder, cfg, queryAPI, nil)
return common.BuildEvent(ctx, &builder, cfg, evTime, queryAPI, nil)
}
// loadProfile lookups the profile of a given user from the database and returns
@ -155,16 +169,20 @@ func buildMembershipEvent(
// Returns an error if the retrieval failed or if the first parameter isn't a
// valid Matrix ID.
func loadProfile(
ctx context.Context, userID string, cfg config.Dendrite, accountDB *accounts.Database,
ctx context.Context,
userID string,
cfg config.Dendrite,
accountDB *accounts.Database,
asAPI appserviceAPI.AppServiceQueryAPI,
) (*authtypes.Profile, error) {
localpart, serverName, err := gomatrixserverlib.SplitID('@', userID)
_, serverName, err := gomatrixserverlib.SplitID('@', userID)
if err != nil {
return nil, err
}
var profile *authtypes.Profile
if serverName == cfg.Matrix.ServerName {
profile, err = accountDB.GetProfileByLocalpart(ctx, localpart)
profile, err = appserviceAPI.RetreiveUserProfile(ctx, userID, asAPI, accountDB)
} else {
profile = &authtypes.Profile{}
}

View file

@ -15,9 +15,11 @@
package routing
import (
"database/sql"
"context"
"net/http"
"time"
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/httputil"
@ -33,7 +35,7 @@ import (
// GetProfile implements GET /profile/{userID}
func GetProfile(
req *http.Request, accountDB *accounts.Database, userID string,
req *http.Request, accountDB *accounts.Database, userID string, asAPI appserviceAPI.AppServiceQueryAPI,
) util.JSONResponse {
if req.Method != http.MethodGet {
return util.JSONResponse{
@ -41,10 +43,9 @@ func GetProfile(
JSON: jsonerror.NotFound("Bad method"),
}
}
profile, err := getProfileByUserID(req, accountDB, userID)
profile, err := appserviceAPI.RetreiveUserProfile(req.Context(), userID, asAPI, accountDB)
if err != nil {
return *err
return httputil.LogThenError(req, err)
}
res := common.ProfileResponse{
@ -57,37 +58,13 @@ func GetProfile(
}
}
// getProfileByUserID returns the profile for userID, otherwise returns an error response
func getProfileByUserID(
req *http.Request, accountDB *accounts.Database, userID string,
) (*authtypes.Profile, *util.JSONResponse) {
localpart, _, err := gomatrixserverlib.SplitID('@', userID)
if err != nil {
resErr := httputil.LogThenError(req, err)
return nil, &resErr
}
profile, err := accountDB.GetProfileByLocalpart(req.Context(), localpart)
if err == sql.ErrNoRows {
return nil, &util.JSONResponse{
Code: http.StatusNotFound,
JSON: jsonerror.NotFound("no profile information for this user or this user does not exist"),
}
} else if err != nil {
resErr := httputil.LogThenError(req, err)
return nil, &resErr
}
return profile, nil
}
// GetAvatarURL implements GET /profile/{userID}/avatar_url
func GetAvatarURL(
req *http.Request, accountDB *accounts.Database, userID string,
req *http.Request, accountDB *accounts.Database, userID string, asAPI appserviceAPI.AppServiceQueryAPI,
) util.JSONResponse {
profile, err := getProfileByUserID(req, accountDB, userID)
profile, err := appserviceAPI.RetreiveUserProfile(req.Context(), userID, asAPI, accountDB)
if err != nil {
return *err
return httputil.LogThenError(req, err)
}
res := common.AvatarURL{
@ -130,6 +107,14 @@ func SetAvatarURL(
return httputil.LogThenError(req, err)
}
evTime, err := httputil.ParseTSParam(req)
if err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.InvalidArgumentValue(err.Error()),
}
}
oldProfile, err := accountDB.GetProfileByLocalpart(req.Context(), localpart)
if err != nil {
return httputil.LogThenError(req, err)
@ -150,7 +135,9 @@ func SetAvatarURL(
AvatarURL: r.AvatarURL,
}
events, err := buildMembershipEvents(req, memberships, newProfile, userID, cfg, queryAPI)
events, err := buildMembershipEvents(
req.Context(), memberships, newProfile, userID, cfg, evTime, queryAPI,
)
if err != nil {
return httputil.LogThenError(req, err)
}
@ -171,13 +158,12 @@ func SetAvatarURL(
// GetDisplayName implements GET /profile/{userID}/displayname
func GetDisplayName(
req *http.Request, accountDB *accounts.Database, userID string,
req *http.Request, accountDB *accounts.Database, userID string, asAPI appserviceAPI.AppServiceQueryAPI,
) util.JSONResponse {
profile, err := getProfileByUserID(req, accountDB, userID)
profile, err := appserviceAPI.RetreiveUserProfile(req.Context(), userID, asAPI, accountDB)
if err != nil {
return *err
return httputil.LogThenError(req, err)
}
res := common.DisplayName{
DisplayName: profile.DisplayName,
}
@ -218,6 +204,14 @@ func SetDisplayName(
return httputil.LogThenError(req, err)
}
evTime, err := httputil.ParseTSParam(req)
if err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.InvalidArgumentValue(err.Error()),
}
}
oldProfile, err := accountDB.GetProfileByLocalpart(req.Context(), localpart)
if err != nil {
return httputil.LogThenError(req, err)
@ -238,7 +232,9 @@ func SetDisplayName(
AvatarURL: oldProfile.AvatarURL,
}
events, err := buildMembershipEvents(req, memberships, newProfile, userID, cfg, queryAPI)
events, err := buildMembershipEvents(
req.Context(), memberships, newProfile, userID, cfg, evTime, queryAPI,
)
if err != nil {
return httputil.LogThenError(req, err)
}
@ -258,10 +254,10 @@ func SetDisplayName(
}
func buildMembershipEvents(
req *http.Request,
ctx context.Context,
memberships []authtypes.Membership,
newProfile authtypes.Profile, userID string, cfg *config.Dendrite,
queryAPI api.RoomserverQueryAPI,
evTime time.Time, queryAPI api.RoomserverQueryAPI,
) ([]gomatrixserverlib.Event, error) {
evs := []gomatrixserverlib.Event{}
@ -284,7 +280,7 @@ func buildMembershipEvents(
return nil, err
}
event, err := common.BuildEvent(req, &builder, *cfg, queryAPI, nil)
event, err := common.BuildEvent(ctx, &builder, *cfg, evTime, queryAPI, nil)
if err != nil {
return nil, err
}

View file

@ -374,7 +374,13 @@ func validateApplicationService(
) (string, *util.JSONResponse) {
// Check if the token if the application service is valid with one we have
// registered in the config.
accessToken := req.URL.Query().Get("access_token")
accessToken, err := auth.ExtractAccessToken(req)
if err != nil {
return "", &util.JSONResponse{
Code: http.StatusUnauthorized,
JSON: jsonerror.MissingToken(err.Error()),
}
}
var matchedApplicationService *config.ApplicationService
for _, appservice := range cfg.Derived.ApplicationServices {
if appservice.ASToken == accessToken {
@ -419,22 +425,6 @@ func validateApplicationService(
return matchedApplicationService.ID, nil
}
// authTypeIsValid checks the registration authentication type of the request
// and returns true or false depending on whether the auth type is valid
func authTypeIsValid(authType *authtypes.LoginType, req *http.Request) bool {
// If no auth type is specified by the client, send back the list of available flows
if *authType == "" && req.URL.Query().Get("access_token") != "" {
// Assume this is an application service registering a user if an empty login
// type was provided alongside an access token
*authType = authtypes.LoginTypeApplicationService
} else if *authType == "" {
// Not an access token, and no login type. Send back the flows
return false
}
return true
}
// Register processes a /register request.
// http://matrix.org/speculator/spec/HEAD/client_server/unstable.html#post-matrix-client-unstable-register
func Register(
@ -474,16 +464,6 @@ func Register(
r.Username = strconv.FormatInt(id, 10)
}
// Check r.Auth.Type is correct for the client requesting (handles application
// services requesting without an auth type)
if !authTypeIsValid(&r.Auth.Type, req) {
return util.JSONResponse{
Code: http.StatusUnauthorized,
JSON: newUserInteractiveResponse(sessionID,
cfg.Derived.Registration.Flows, cfg.Derived.Registration.Params),
}
}
// Squash username to all lowercase letters
r.Username = strings.ToLower(r.Username)
@ -562,7 +542,8 @@ func handleRegistrationFlow(
// Add SharedSecret to the list of completed registration stages
sessions.AddCompletedStage(sessionID, authtypes.LoginTypeSharedSecret)
case authtypes.LoginTypeApplicationService:
case "", authtypes.LoginTypeApplicationService:
// not passing a Auth.Type is allowed for ApplicationServices. So assume that as well
// Check application service register user request is valid.
// The application service's ID is returned if so.
appserviceID, err := validateApplicationService(cfg, req, r.Username)
@ -885,6 +866,7 @@ type availableResponse struct {
// RegisterAvailable checks if the username is already taken or invalid.
func RegisterAvailable(
req *http.Request,
cfg config.Dendrite,
accountDB *accounts.Database,
) util.JSONResponse {
username := req.URL.Query().Get("username")
@ -896,6 +878,17 @@ func RegisterAvailable(
return *err
}
// Check if this username is reserved by an application service
userID := userutil.MakeUserID(username, cfg.Matrix.ServerName)
for _, appservice := range cfg.Derived.ApplicationServices {
if appservice.IsInterestedInUserID(userID) {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.UserInUse("Desired user ID is reserved by an application service."),
}
}
}
availability, availabilityErr := accountDB.CheckAccountAvailability(req.Context(), username)
if availabilityErr != nil {
return util.JSONResponse{
@ -906,7 +899,7 @@ func RegisterAvailable(
if !availability {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.InvalidUsername("A different user ID has already been registered for this session"),
JSON: jsonerror.UserInUse("Desired User ID is already taken."),
}
}

View file

@ -20,6 +20,7 @@ import (
"strings"
"github.com/gorilla/mux"
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/clientapi/auth"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
@ -29,7 +30,7 @@ import (
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/common/transactions"
"github.com/matrix-org/dendrite/roomserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
@ -42,14 +43,17 @@ const pathPrefixUnstable = "/_matrix/client/unstable"
// to clients which need to make outbound HTTP requests.
func Setup(
apiMux *mux.Router, cfg config.Dendrite,
producer *producers.RoomserverProducer, queryAPI api.RoomserverQueryAPI,
aliasAPI api.RoomserverAliasAPI,
producer *producers.RoomserverProducer,
queryAPI roomserverAPI.RoomserverQueryAPI,
aliasAPI roomserverAPI.RoomserverAliasAPI,
asAPI appserviceAPI.AppServiceQueryAPI,
accountDB *accounts.Database,
deviceDB *devices.Database,
federation *gomatrixserverlib.FederationClient,
keyRing gomatrixserverlib.KeyRing,
userUpdateProducer *producers.UserUpdateProducer,
syncProducer *producers.SyncAPIProducer,
typingProducer *producers.TypingServerProducer,
transactionsCache *transactions.Cache,
) {
@ -77,7 +81,7 @@ func Setup(
r0mux.Handle("/createRoom",
common.MakeAuthAPI("createRoom", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
return CreateRoom(req, device, cfg, producer, accountDB, aliasAPI)
return CreateRoom(req, device, cfg, producer, accountDB, aliasAPI, asAPI)
}),
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/join/{roomIDOrAlias}",
@ -91,7 +95,7 @@ func Setup(
r0mux.Handle("/rooms/{roomID}/{membership:(?:join|kick|ban|unban|leave|invite)}",
common.MakeAuthAPI("membership", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
return SendMembership(req, accountDB, device, vars["roomID"], vars["membership"], cfg, queryAPI, producer)
return SendMembership(req, accountDB, device, vars["roomID"], vars["membership"], cfg, queryAPI, asAPI, producer)
}),
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/send/{eventType}",
@ -137,11 +141,11 @@ func Setup(
})).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/register/available", common.MakeExternalAPI("registerAvailable", func(req *http.Request) util.JSONResponse {
return RegisterAvailable(req, accountDB)
return RegisterAvailable(req, cfg, accountDB)
})).Methods(http.MethodGet, http.MethodOptions)
r0mux.Handle("/directory/room/{roomAlias}",
common.MakeAuthAPI("directory_room", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
common.MakeExternalAPI("directory_room", func(req *http.Request) util.JSONResponse {
vars := mux.Vars(req)
return DirectoryRoom(req, vars["roomAlias"], federation, &cfg, aliasAPI)
}),
@ -173,6 +177,19 @@ func Setup(
}),
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/typing/{userID}",
common.MakeAuthAPI("rooms_typing", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
return SendTyping(req, device, vars["roomID"], vars["userID"], accountDB, typingProducer)
}),
).Methods(http.MethodPut, http.MethodOptions)
r0mux.Handle("/account/whoami",
common.MakeAuthAPI("whoami", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
return Whoami(req, device)
}),
).Methods(http.MethodGet, http.MethodOptions)
// Stub endpoints required by Riot
r0mux.Handle("/login",
@ -219,14 +236,14 @@ func Setup(
r0mux.Handle("/profile/{userID}",
common.MakeExternalAPI("profile", func(req *http.Request) util.JSONResponse {
vars := mux.Vars(req)
return GetProfile(req, accountDB, vars["userID"])
return GetProfile(req, accountDB, vars["userID"], asAPI)
}),
).Methods(http.MethodGet, http.MethodOptions)
r0mux.Handle("/profile/{userID}/avatar_url",
common.MakeExternalAPI("profile_avatar_url", func(req *http.Request) util.JSONResponse {
vars := mux.Vars(req)
return GetAvatarURL(req, accountDB, vars["userID"])
return GetAvatarURL(req, accountDB, vars["userID"], asAPI)
}),
).Methods(http.MethodGet, http.MethodOptions)
@ -242,7 +259,7 @@ func Setup(
r0mux.Handle("/profile/{userID}/displayname",
common.MakeExternalAPI("profile_displayname", func(req *http.Request) util.JSONResponse {
vars := mux.Vars(req)
return GetDisplayName(req, accountDB, vars["userID"])
return GetDisplayName(req, accountDB, vars["userID"], asAPI)
}),
).Methods(http.MethodGet, http.MethodOptions)
@ -357,13 +374,6 @@ func Setup(
}),
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/typing/{userID}",
common.MakeExternalAPI("rooms_typing", func(req *http.Request) util.JSONResponse {
// TODO: handling typing
return util.JSONResponse{Code: http.StatusOK, JSON: struct{}{}}
}),
).Methods(http.MethodPut, http.MethodOptions)
r0mux.Handle("/devices",
common.MakeAuthAPI("get_devices", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
return GetDevicesByLocalpart(req, deviceDB, device)

View file

@ -55,50 +55,11 @@ func SendEvent(
}
}
// parse the incoming http request
userID := device.UserID
var r map[string]interface{} // must be a JSON object
resErr := httputil.UnmarshalJSONRequest(req, &r)
e, resErr := generateSendEvent(req, device, roomID, eventType, stateKey, cfg, queryAPI)
if resErr != nil {
return *resErr
}
// create the new event and set all the fields we can
builder := gomatrixserverlib.EventBuilder{
Sender: userID,
RoomID: roomID,
Type: eventType,
StateKey: stateKey,
}
err := builder.SetContent(r)
if err != nil {
return httputil.LogThenError(req, err)
}
var queryRes api.QueryLatestEventsAndStateResponse
e, err := common.BuildEvent(req, &builder, cfg, queryAPI, &queryRes)
if err == common.ErrRoomNoExists {
return util.JSONResponse{
Code: http.StatusNotFound,
JSON: jsonerror.NotFound("Room does not exist"),
}
} else if err != nil {
return httputil.LogThenError(req, err)
}
// check to see if this user can perform this operation
stateEvents := make([]*gomatrixserverlib.Event, len(queryRes.StateEvents))
for i := range queryRes.StateEvents {
stateEvents[i] = &queryRes.StateEvents[i]
}
provider := gomatrixserverlib.NewAuthEvents(stateEvents)
if err = gomatrixserverlib.Allowed(*e, &provider); err != nil {
return util.JSONResponse{
Code: http.StatusForbidden,
JSON: jsonerror.Forbidden(err.Error()), // TODO: Is this error string comprehensible to the client?
}
}
var txnAndDeviceID *api.TransactionID
if txnID != nil {
txnAndDeviceID = &api.TransactionID{
@ -127,3 +88,66 @@ func SendEvent(
return res
}
func generateSendEvent(
req *http.Request,
device *authtypes.Device,
roomID, eventType string, stateKey *string,
cfg config.Dendrite,
queryAPI api.RoomserverQueryAPI,
) (*gomatrixserverlib.Event, *util.JSONResponse) {
// parse the incoming http request
userID := device.UserID
var r map[string]interface{} // must be a JSON object
resErr := httputil.UnmarshalJSONRequest(req, &r)
if resErr != nil {
return nil, resErr
}
evTime, err := httputil.ParseTSParam(req)
if err != nil {
return nil, &util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.InvalidArgumentValue(err.Error()),
}
}
// create the new event and set all the fields we can
builder := gomatrixserverlib.EventBuilder{
Sender: userID,
RoomID: roomID,
Type: eventType,
StateKey: stateKey,
}
err = builder.SetContent(r)
if err != nil {
resErr := httputil.LogThenError(req, err)
return nil, &resErr
}
var queryRes api.QueryLatestEventsAndStateResponse
e, err := common.BuildEvent(req.Context(), &builder, cfg, evTime, queryAPI, &queryRes)
if err == common.ErrRoomNoExists {
return nil, &util.JSONResponse{
Code: http.StatusNotFound,
JSON: jsonerror.NotFound("Room does not exist"),
}
} else if err != nil {
resErr := httputil.LogThenError(req, err)
return nil, &resErr
}
// check to see if this user can perform this operation
stateEvents := make([]*gomatrixserverlib.Event, len(queryRes.StateEvents))
for i := range queryRes.StateEvents {
stateEvents[i] = &queryRes.StateEvents[i]
}
provider := gomatrixserverlib.NewAuthEvents(stateEvents)
if err = gomatrixserverlib.Allowed(*e, &provider); err != nil {
return nil, &util.JSONResponse{
Code: http.StatusForbidden,
JSON: jsonerror.Forbidden(err.Error()), // TODO: Is this error string comprehensible to the client?
}
}
return e, nil
}

View file

@ -0,0 +1,80 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package routing
import (
"database/sql"
"net/http"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/clientapi/userutil"
"github.com/matrix-org/util"
)
type typingContentJSON struct {
Typing bool `json:"typing"`
Timeout int64 `json:"timeout"`
}
// SendTyping handles PUT /rooms/{roomID}/typing/{userID}
// sends the typing events to client API typingProducer
func SendTyping(
req *http.Request, device *authtypes.Device, roomID string,
userID string, accountDB *accounts.Database,
typingProducer *producers.TypingServerProducer,
) util.JSONResponse {
if device.UserID != userID {
return util.JSONResponse{
Code: http.StatusForbidden,
JSON: jsonerror.Forbidden("Cannot set another user's typing state"),
}
}
localpart, err := userutil.ParseUsernameParam(userID, nil)
if err != nil {
return httputil.LogThenError(req, err)
}
// Verify that the user is a member of this room
_, err = accountDB.GetMembershipInRoomByLocalpart(req.Context(), localpart, roomID)
if err == sql.ErrNoRows {
return util.JSONResponse{
Code: http.StatusForbidden,
JSON: jsonerror.Forbidden("User not in this room"),
}
} else if err != nil {
return httputil.LogThenError(req, err)
}
// parse the incoming http request
var r typingContentJSON
resErr := httputil.UnmarshalJSONRequest(req, &r)
if resErr != nil {
return *resErr
}
if err = typingProducer.Send(
req.Context(), userID, roomID, r.Typing, r.Timeout,
); err != nil {
return httputil.LogThenError(req, err)
}
return util.JSONResponse{
Code: http.StatusOK,
JSON: struct{}{},
}
}

View file

@ -0,0 +1,34 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package routing
import (
"net/http"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/util"
)
// whoamiResponse represents an response for a `whoami` request
type whoamiResponse struct {
UserID string `json:"user_id"`
}
// Whoami implements `/account/whoami` which enables client to query their account user id.
// https://matrix.org/docs/spec/client_server/r0.3.0.html#get-matrix-client-r0-account-whoami
func Whoami(req *http.Request, device *authtypes.Device) util.JSONResponse {
return util.JSONResponse{
Code: http.StatusOK,
JSON: whoamiResponse{UserID: device.UserID},
}
}

View file

@ -85,10 +85,11 @@ var (
// fills the Matrix ID in the request body so a normal invite membership event
// can be emitted.
func CheckAndProcessInvite(
req *http.Request,
ctx context.Context,
device *authtypes.Device, body *MembershipRequest, cfg config.Dendrite,
queryAPI api.RoomserverQueryAPI, db *accounts.Database,
producer *producers.RoomserverProducer, membership string, roomID string,
evTime time.Time,
) (inviteStoredOnIDServer bool, err error) {
if membership != "invite" || (body.Address == "" && body.IDServer == "" && body.Medium == "") {
// If none of the 3PID-specific fields are supplied, it's a standard invite
@ -101,7 +102,7 @@ func CheckAndProcessInvite(
return
}
lookupRes, storeInviteRes, err := queryIDServer(req.Context(), db, cfg, device, body, roomID)
lookupRes, storeInviteRes, err := queryIDServer(ctx, db, cfg, device, body, roomID)
if err != nil {
return
}
@ -110,7 +111,9 @@ func CheckAndProcessInvite(
// No Matrix ID could be found for this 3PID, meaning that a
// "m.room.third_party_invite" have to be emitted from the data in
// storeInviteRes.
err = emit3PIDInviteEvent(req, body, storeInviteRes, device, roomID, cfg, queryAPI, producer)
err = emit3PIDInviteEvent(
ctx, body, storeInviteRes, device, roomID, cfg, queryAPI, producer, evTime,
)
inviteStoredOnIDServer = err == nil
return
@ -325,10 +328,11 @@ func checkIDServerSignatures(
// emit3PIDInviteEvent builds and sends a "m.room.third_party_invite" event.
// Returns an error if something failed in the process.
func emit3PIDInviteEvent(
req *http.Request,
ctx context.Context,
body *MembershipRequest, res *idServerStoreInviteResponse,
device *authtypes.Device, roomID string, cfg config.Dendrite,
queryAPI api.RoomserverQueryAPI, producer *producers.RoomserverProducer,
evTime time.Time,
) error {
builder := &gomatrixserverlib.EventBuilder{
Sender: device.UserID,
@ -350,11 +354,11 @@ func emit3PIDInviteEvent(
}
var queryRes *api.QueryLatestEventsAndStateResponse
event, err := common.BuildEvent(req, builder, cfg, queryAPI, queryRes)
event, err := common.BuildEvent(ctx, builder, cfg, evTime, queryAPI, queryRes)
if err != nil {
return err
}
_, err = producer.SendEvents(req.Context(), []gomatrixserverlib.Event{*event}, cfg.Matrix.ServerName, nil)
_, err = producer.SendEvents(ctx, []gomatrixserverlib.Event{*event}, cfg.Matrix.ServerName, nil)
return err
}

View file

@ -19,6 +19,8 @@ import (
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/common/keydb"
"github.com/matrix-org/dendrite/common/transactions"
"github.com/matrix-org/dendrite/typingserver"
"github.com/matrix-org/dendrite/typingserver/cache"
)
func main() {
@ -33,12 +35,13 @@ func main() {
federation := base.CreateFederationClient()
keyRing := keydb.CreateKeyRing(federation.Client, keyDB)
asQuery := base.CreateHTTPAppServiceAPIs()
alias, input, query := base.CreateHTTPRoomserverAPIs()
cache := transactions.New()
typingInputAPI := typingserver.SetupTypingServerComponent(base, cache.NewTypingCache())
clientapi.SetupClientAPIComponent(
base, deviceDB, accountDB, federation, &keyRing,
alias, input, query, cache,
alias, input, query, typingInputAPI, asQuery, transactions.New(),
)
base.SetupAndServeHTTP(string(base.Cfg.Listen.ClientAPI))

View file

@ -32,10 +32,11 @@ func main() {
keyRing := keydb.CreateKeyRing(federation.Client, keyDB)
alias, input, query := base.CreateHTTPRoomserverAPIs()
asQuery := base.CreateHTTPAppServiceAPIs()
federationapi.SetupFederationAPIComponent(
base, accountDB, deviceDB, federation, &keyRing,
alias, input, query,
alias, input, query, asQuery,
)
base.SetupAndServeHTTP(string(base.Cfg.Listen.FederationAPI))

View file

@ -18,19 +18,20 @@ import (
"flag"
"net/http"
"github.com/matrix-org/dendrite/common/keydb"
"github.com/matrix-org/dendrite/common/transactions"
"github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/clientapi"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/common/keydb"
"github.com/matrix-org/dendrite/common/transactions"
"github.com/matrix-org/dendrite/federationapi"
"github.com/matrix-org/dendrite/federationsender"
"github.com/matrix-org/dendrite/mediaapi"
"github.com/matrix-org/dendrite/publicroomsapi"
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/syncapi"
"github.com/matrix-org/dendrite/typingserver"
"github.com/matrix-org/dendrite/typingserver/cache"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
@ -55,18 +56,21 @@ func main() {
keyRing := keydb.CreateKeyRing(federation.Client, keyDB)
alias, input, query := roomserver.SetupRoomServerComponent(base)
typingInputAPI := typingserver.SetupTypingServerComponent(base, cache.NewTypingCache())
asQuery := appservice.SetupAppServiceAPIComponent(
base, accountDB, deviceDB, federation, alias, query, transactions.New(),
)
clientapi.SetupClientAPIComponent(
base, deviceDB, accountDB,
federation, &keyRing, alias, input, query,
transactions.New(),
typingInputAPI, asQuery, transactions.New(),
)
federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query)
federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery)
federationsender.SetupFederationSenderComponent(base, federation, query)
mediaapi.SetupMediaAPIComponent(base, deviceDB)
publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB)
syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query)
appservice.SetupAppServiceAPIComponent(base, accountDB, deviceDB, federation, alias, query, transactions.New())
httpHandler := common.WrapHandlerInCORS(base.APIMux)

View file

@ -0,0 +1,36 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
_ "net/http/pprof"
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/typingserver"
"github.com/matrix-org/dendrite/typingserver/cache"
"github.com/sirupsen/logrus"
)
func main() {
cfg := basecomponent.ParseFlags()
base := basecomponent.NewBaseDendrite(cfg, "TypingServerAPI")
defer func() {
if err := base.Close(); err != nil {
logrus.WithError(err).Warn("BaseDendrite close failed")
}
}()
typingserver.SetupTypingServerComponent(base, cache.NewTypingCache())
base.SetupAndServeHTTP(string(base.Cfg.Listen.TypingServer))
}

View file

@ -33,6 +33,7 @@ import (
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/common/config"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
typingServerAPI "github.com/matrix-org/dendrite/typingserver/api"
"github.com/sirupsen/logrus"
)
@ -100,6 +101,12 @@ func (b *BaseDendrite) CreateHTTPRoomserverAPIs() (
return alias, input, query
}
// CreateHTTPTypingServerAPIs returns typingInputAPI for hitting the typing
// server over HTTP
func (b *BaseDendrite) CreateHTTPTypingServerAPIs() typingServerAPI.TypingServerInputAPI {
return typingServerAPI.NewTypingServerInputAPIHTTP(b.Cfg.TypingServerURL(), nil)
}
// CreateDeviceDB creates a new instance of the device database. Should only be
// called once per component.
func (b *BaseDendrite) CreateDeviceDB() *devices.Database {

View file

@ -134,6 +134,8 @@ type Dendrite struct {
OutputRoomEvent Topic `yaml:"output_room_event"`
// Topic for sending account data from client API to sync API
OutputClientData Topic `yaml:"output_client_data"`
// Topic for typingserver/api.OutputTypingEvent events.
OutputTypingEvent Topic `yaml:"output_typing_event"`
// Topic for user updates (profile, presence)
UserUpdates Topic `yaml:"user_updates"`
}
@ -203,6 +205,7 @@ type Dendrite struct {
RoomServer Address `yaml:"room_server"`
FederationSender Address `yaml:"federation_sender"`
PublicRoomsAPI Address `yaml:"public_rooms_api"`
TypingServer Address `yaml:"typing_server"`
} `yaml:"listen"`
// The config for tracing the dendrite servers.
@ -526,6 +529,7 @@ func (config *Dendrite) checkKafka(configErrs *configErrors, monolithic bool) {
}
checkNotEmpty(configErrs, "kafka.topics.output_room_event", string(config.Kafka.Topics.OutputRoomEvent))
checkNotEmpty(configErrs, "kafka.topics.output_client_data", string(config.Kafka.Topics.OutputClientData))
checkNotEmpty(configErrs, "kafka.topics.output_typing_event", string(config.Kafka.Topics.OutputTypingEvent))
checkNotEmpty(configErrs, "kafka.topics.user_updates", string(config.Kafka.Topics.UserUpdates))
}
@ -546,6 +550,7 @@ func (config *Dendrite) checkListen(configErrs *configErrors) {
checkNotEmpty(configErrs, "listen.federation_api", string(config.Listen.FederationAPI))
checkNotEmpty(configErrs, "listen.sync_api", string(config.Listen.SyncAPI))
checkNotEmpty(configErrs, "listen.room_server", string(config.Listen.RoomServer))
checkNotEmpty(configErrs, "listen.typing_server", string(config.Listen.TypingServer))
}
// checkLogging verifies the parameters logging.* are valid.
@ -659,6 +664,15 @@ func (config *Dendrite) RoomServerURL() string {
return "http://" + string(config.Listen.RoomServer)
}
// TypingServerURL returns an HTTP URL for where the typing server is listening.
func (config *Dendrite) TypingServerURL() string {
// Hard code the typing server to talk HTTP for now.
// If we support HTTPS we need to think of a practical way to do certificate validation.
// People setting up servers shouldn't need to get a certificate valid for the public
// internet for an internal API.
return "http://" + string(config.Listen.TypingServer)
}
// SetupTracing configures the opentracing using the supplied configuration.
func (config *Dendrite) SetupTracing(serviceName string) (closer io.Closer, err error) {
return config.Tracing.Jaeger.InitGlobalTracer(

View file

@ -45,6 +45,7 @@ kafka:
topics:
output_room_event: output.room
output_client_data: output.client
output_typing_event: output.typing
user_updates: output.user
database:
media_api: "postgresql:///media_api"
@ -59,6 +60,7 @@ listen:
federation_api: "localhost:7772"
sync_api: "localhost:7773"
media_api: "localhost:7774"
typing_server: "localhost:7778"
logging:
- type: "file"
level: "info"

View file

@ -18,8 +18,6 @@ import (
"context"
"errors"
"fmt"
"net/http"
"strconv"
"time"
"github.com/matrix-org/dendrite/common/config"
@ -40,18 +38,17 @@ var ErrRoomNoExists = errors.New("Room does not exist")
// the room doesn't exist
// Returns an error if something else went wrong
func BuildEvent(
req *http.Request,
builder *gomatrixserverlib.EventBuilder, cfg config.Dendrite,
ctx context.Context,
builder *gomatrixserverlib.EventBuilder, cfg config.Dendrite, evTime time.Time,
queryAPI api.RoomserverQueryAPI, queryRes *api.QueryLatestEventsAndStateResponse,
) (*gomatrixserverlib.Event, error) {
err := AddPrevEventsToEvent(req.Context(), builder, queryAPI, queryRes)
err := AddPrevEventsToEvent(ctx, builder, queryAPI, queryRes)
if err != nil {
return nil, err
}
eventID := fmt.Sprintf("$%s:%s", util.RandomString(16), cfg.Matrix.ServerName)
eventTime := ParseTSParam(req)
event, err := builder.Build(eventID, eventTime, cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey)
event, err := builder.Build(eventID, evTime, cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey)
if err != nil {
return nil, err
}
@ -59,25 +56,6 @@ func BuildEvent(
return &event, nil
}
// ParseTSParam takes a req from an application service and parses a Time object
// from the req if it exists in the query parameters. If it doesn't exist, the
// current time is returned.
func ParseTSParam(req *http.Request) time.Time {
// Use the ts parameter's value for event time if present
tsStr := req.URL.Query().Get("ts")
if tsStr == "" {
return time.Now()
}
// The parameter exists, parse into a Time object
ts, err := strconv.ParseInt(tsStr, 10, 64)
if err != nil {
return time.Unix(ts/1000, 0)
}
return time.Unix(ts/1000, 0)
}
// AddPrevEventsToEvent fills out the prev_events and auth_events fields in builder
func AddPrevEventsToEvent(
ctx context.Context,

View file

@ -83,6 +83,7 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con
// Make this configurable somehow?
cfg.Kafka.Topics.OutputRoomEvent = "test.room.output"
cfg.Kafka.Topics.OutputClientData = "test.clientapi.output"
cfg.Kafka.Topics.OutputTypingEvent = "test.typing.output"
cfg.Kafka.Topics.UserUpdates = "test.user.output"
// TODO: Use different databases for the different schemas.
@ -103,6 +104,7 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con
cfg.Listen.RoomServer = assignAddress()
cfg.Listen.SyncAPI = assignAddress()
cfg.Listen.PublicRoomsAPI = assignAddress()
cfg.Listen.TypingServer = assignAddress()
return &cfg, port, nil
}

View file

@ -0,0 +1,34 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package test
import "sort"
// UnsortedStringSliceEqual returns true if the slices have same length & elements.
// Does not modify the given slice.
func UnsortedStringSliceEqual(first, second []string) bool {
if len(first) != len(second) {
return false
}
a, b := first[:], second[:]
sort.Strings(a)
sort.Strings(b)
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}

View file

@ -15,10 +15,12 @@
package federationapi
import (
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/roomserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
// TODO: Are we really wanting to pull in the producer from clientapi
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/federationapi/routing"
@ -33,14 +35,15 @@ func SetupFederationAPIComponent(
deviceDB *devices.Database,
federation *gomatrixserverlib.FederationClient,
keyRing *gomatrixserverlib.KeyRing,
aliasAPI api.RoomserverAliasAPI,
inputAPI api.RoomserverInputAPI,
queryAPI api.RoomserverQueryAPI,
aliasAPI roomserverAPI.RoomserverAliasAPI,
inputAPI roomserverAPI.RoomserverInputAPI,
queryAPI roomserverAPI.RoomserverQueryAPI,
asAPI appserviceAPI.AppServiceQueryAPI,
) {
roomserverProducer := producers.NewRoomserverProducer(inputAPI)
routing.Setup(
base.APIMux, *base.Cfg, queryAPI, aliasAPI,
base.APIMux, *base.Cfg, queryAPI, aliasAPI, asAPI,
roomserverProducer, *keyRing, federation, accountsDB, deviceDB,
)
}

View file

@ -15,9 +15,9 @@
package routing
import (
"context"
"encoding/json"
"net/http"
"time"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
@ -64,7 +64,7 @@ func MakeJoin(
}
var queryRes api.QueryLatestEventsAndStateResponse
event, err := common.BuildEvent(httpReq, &builder, cfg, query, &queryRes)
event, err := common.BuildEvent(httpReq.Context(), &builder, cfg, time.Now(), query, &queryRes)
if err == common.ErrRoomNoExists {
return util.JSONResponse{
Code: http.StatusNotFound,
@ -95,7 +95,6 @@ func MakeJoin(
// SendJoin implements the /send_join API
func SendJoin(
ctx context.Context,
httpReq *http.Request,
request *gomatrixserverlib.FederationRequest,
cfg config.Dendrite,
@ -142,7 +141,7 @@ func SendJoin(
Message: event.Redact().JSON(),
AtTS: event.OriginServerTS(),
}}
verifyResults, err := keys.VerifyJSONs(ctx, verifyRequests)
verifyResults, err := keys.VerifyJSONs(httpReq.Context(), verifyRequests)
if err != nil {
return httputil.LogThenError(httpReq, err)
}
@ -156,7 +155,7 @@ func SendJoin(
// Fetch the state and auth chain. We do this before we send the events
// on, in case this fails.
var stateAndAuthChainRepsonse api.QueryStateAndAuthChainResponse
err = query.QueryStateAndAuthChain(ctx, &api.QueryStateAndAuthChainRequest{
err = query.QueryStateAndAuthChain(httpReq.Context(), &api.QueryStateAndAuthChainRequest{
PrevEventIDs: event.PrevEventIDs(),
AuthEventIDs: event.AuthEventIDs(),
RoomID: roomID,
@ -168,7 +167,9 @@ func SendJoin(
// Send the events to the room server.
// We are responsible for notifying other servers that the user has joined
// the room, so set SendAsServer to cfg.Matrix.ServerName
_, err = producer.SendEvents(ctx, []gomatrixserverlib.Event{event}, cfg.Matrix.ServerName, nil)
_, err = producer.SendEvents(
httpReq.Context(), []gomatrixserverlib.Event{event}, cfg.Matrix.ServerName, nil,
)
if err != nil {
return httputil.LogThenError(httpReq, err)
}

View file

@ -15,6 +15,7 @@ package routing
import (
"encoding/json"
"net/http"
"time"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
@ -61,7 +62,7 @@ func MakeLeave(
}
var queryRes api.QueryLatestEventsAndStateResponse
event, err := common.BuildEvent(httpReq, &builder, cfg, query, &queryRes)
event, err := common.BuildEvent(httpReq.Context(), &builder, cfg, time.Now(), query, &queryRes)
if err == common.ErrRoomNoExists {
return util.JSONResponse{
Code: http.StatusNotFound,

View file

@ -15,9 +15,9 @@
package routing
import (
"database/sql"
"net/http"
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
@ -32,6 +32,7 @@ func GetProfile(
httpReq *http.Request,
accountDB *accounts.Database,
cfg config.Dendrite,
asAPI appserviceAPI.AppServiceQueryAPI,
) util.JSONResponse {
userID, field := httpReq.FormValue("user_id"), httpReq.FormValue("field")
@ -43,7 +44,7 @@ func GetProfile(
}
}
localpart, domain, err := gomatrixserverlib.SplitID('@', userID)
_, domain, err := gomatrixserverlib.SplitID('@', userID)
if err != nil {
return httputil.LogThenError(httpReq, err)
}
@ -52,13 +53,8 @@ func GetProfile(
return httputil.LogThenError(httpReq, err)
}
profile, err := accountDB.GetProfileByLocalpart(httpReq.Context(), localpart)
if err == sql.ErrNoRows {
return util.JSONResponse{
Code: http.StatusNotFound,
JSON: jsonerror.NotFound("no profile information for this user or this user does not exist"),
}
} else if err != nil {
profile, err := appserviceAPI.RetreiveUserProfile(httpReq.Context(), userID, asAPI, accountDB)
if err != nil {
return httputil.LogThenError(httpReq, err)
}

View file

@ -21,7 +21,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/roomserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
@ -32,9 +32,9 @@ func RoomAliasToID(
httpReq *http.Request,
federation *gomatrixserverlib.FederationClient,
cfg config.Dendrite,
aliasAPI api.RoomserverAliasAPI,
aliasAPI roomserverAPI.RoomserverAliasAPI,
) util.JSONResponse {
roomAlias := httpReq.FormValue("alias")
roomAlias := httpReq.FormValue("room_alias")
if roomAlias == "" {
return util.JSONResponse{
Code: http.StatusBadRequest,
@ -52,20 +52,20 @@ func RoomAliasToID(
var resp gomatrixserverlib.RespDirectory
if domain == cfg.Matrix.ServerName {
queryReq := api.GetRoomIDForAliasRequest{Alias: roomAlias}
var queryRes api.GetRoomIDForAliasResponse
queryReq := roomserverAPI.GetRoomIDForAliasRequest{Alias: roomAlias}
var queryRes roomserverAPI.GetRoomIDForAliasResponse
if err = aliasAPI.GetRoomIDForAlias(httpReq.Context(), &queryReq, &queryRes); err != nil {
return httputil.LogThenError(httpReq, err)
}
if queryRes.RoomID == "" {
if queryRes.RoomID != "" {
// TODO: List servers that are aware of this room alias
resp = gomatrixserverlib.RespDirectory{
RoomID: queryRes.RoomID,
Servers: []gomatrixserverlib.ServerName{},
}
} else {
// If the response doesn't contain a non-empty string, return an error
// If no alias was found, return an error
return util.JSONResponse{
Code: http.StatusNotFound,
JSON: jsonerror.NotFound(fmt.Sprintf("Room alias %s not found", roomAlias)),

View file

@ -16,15 +16,15 @@ package routing
import (
"net/http"
"time"
"github.com/gorilla/mux"
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/roomserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
@ -38,8 +38,9 @@ const (
func Setup(
apiMux *mux.Router,
cfg config.Dendrite,
query api.RoomserverQueryAPI,
aliasAPI api.RoomserverAliasAPI,
query roomserverAPI.RoomserverQueryAPI,
aliasAPI roomserverAPI.RoomserverAliasAPI,
asAPI appserviceAPI.AppServiceQueryAPI,
producer *producers.RoomserverProducer,
keys gomatrixserverlib.KeyRing,
federation *gomatrixserverlib.FederationClient,
@ -84,7 +85,7 @@ func Setup(
v1fedmux.Handle("/3pid/onbind", common.MakeExternalAPI("3pid_onbind",
func(req *http.Request) util.JSONResponse {
return CreateInvitesFrom3PIDInvites(req, query, cfg, producer, federation, accountDB)
return CreateInvitesFrom3PIDInvites(req, query, asAPI, cfg, producer, federation, accountDB)
},
)).Methods(http.MethodPost, http.MethodOptions)
@ -113,8 +114,7 @@ func Setup(
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse {
vars := mux.Vars(httpReq)
return GetState(
httpReq.Context(), request, cfg, query, time.Now(),
keys, vars["roomID"],
httpReq.Context(), request, query, vars["roomID"],
)
},
)).Methods(http.MethodGet)
@ -124,13 +124,12 @@ func Setup(
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse {
vars := mux.Vars(httpReq)
return GetStateIDs(
httpReq.Context(), request, cfg, query, time.Now(),
keys, vars["roomID"],
httpReq.Context(), request, query, vars["roomID"],
)
},
)).Methods(http.MethodGet)
v1fedmux.Handle("/query/directory/", common.MakeFedAPI(
v1fedmux.Handle("/query/directory", common.MakeFedAPI(
"federation_query_room_alias", cfg.Matrix.ServerName, keys,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse {
return RoomAliasToID(
@ -143,13 +142,13 @@ func Setup(
"federation_query_profile", cfg.Matrix.ServerName, keys,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse {
return GetProfile(
httpReq, accountDB, cfg,
httpReq, accountDB, cfg, asAPI,
)
},
)).Methods(http.MethodGet)
v1fedmux.Handle("/query/user_devices/{userID}", common.MakeFedAPI(
"federation_query_user_devices", cfg.Matrix.ServerName, keys,
v1fedmux.Handle("/user/devices/{userID}", common.MakeFedAPI(
"federation_user_devices", cfg.Matrix.ServerName, keys,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse {
vars := mux.Vars(httpReq)
return GetUserDevices(
@ -177,7 +176,7 @@ func Setup(
roomID := vars["roomID"]
userID := vars["userID"]
return SendJoin(
httpReq.Context(), httpReq, request, cfg, query, producer, keys, roomID, userID,
httpReq, request, cfg, query, producer, keys, roomID, userID,
)
},
)).Methods(http.MethodPut)

View file

@ -16,10 +16,8 @@ import (
"context"
"net/http"
"net/url"
"time"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
@ -29,10 +27,7 @@ import (
func GetState(
ctx context.Context,
request *gomatrixserverlib.FederationRequest,
_ config.Dendrite,
query api.RoomserverQueryAPI,
_ time.Time,
_ gomatrixserverlib.KeyRing,
roomID string,
) util.JSONResponse {
eventID, err := parseEventIDParam(request)
@ -52,10 +47,7 @@ func GetState(
func GetStateIDs(
ctx context.Context,
request *gomatrixserverlib.FederationRequest,
_ config.Dendrite,
query api.RoomserverQueryAPI,
_ time.Time,
_ gomatrixserverlib.KeyRing,
roomID string,
) util.JSONResponse {
eventID, err := parseEventIDParam(request)

View file

@ -20,14 +20,16 @@ import (
"errors"
"fmt"
"net/http"
"time"
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/roomserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
@ -57,7 +59,8 @@ var (
// CreateInvitesFrom3PIDInvites implements POST /_matrix/federation/v1/3pid/onbind
func CreateInvitesFrom3PIDInvites(
req *http.Request, queryAPI api.RoomserverQueryAPI, cfg config.Dendrite,
req *http.Request, queryAPI roomserverAPI.RoomserverQueryAPI,
asAPI appserviceAPI.AppServiceQueryAPI, cfg config.Dendrite,
producer *producers.RoomserverProducer, federation *gomatrixserverlib.FederationClient,
accountDB *accounts.Database,
) util.JSONResponse {
@ -69,7 +72,7 @@ func CreateInvitesFrom3PIDInvites(
evs := []gomatrixserverlib.Event{}
for _, inv := range body.Invites {
event, err := createInviteFrom3PIDInvite(
req, queryAPI, cfg, inv, federation, accountDB,
req.Context(), queryAPI, asAPI, cfg, inv, federation, accountDB,
)
if err != nil {
return httputil.LogThenError(req, err)
@ -95,7 +98,7 @@ func ExchangeThirdPartyInvite(
httpReq *http.Request,
request *gomatrixserverlib.FederationRequest,
roomID string,
queryAPI api.RoomserverQueryAPI,
queryAPI roomserverAPI.RoomserverQueryAPI,
cfg config.Dendrite,
federation *gomatrixserverlib.FederationClient,
producer *producers.RoomserverProducer,
@ -134,7 +137,7 @@ func ExchangeThirdPartyInvite(
}
// Auth and build the event from what the remote server sent us
event, err := buildMembershipEvent(httpReq, &builder, queryAPI, cfg)
event, err := buildMembershipEvent(httpReq.Context(), &builder, queryAPI, cfg)
if err == errNotInRoom {
return util.JSONResponse{
Code: http.StatusNotFound,
@ -169,11 +172,12 @@ func ExchangeThirdPartyInvite(
// Returns an error if there was a problem building the event or fetching the
// necessary data to do so.
func createInviteFrom3PIDInvite(
req *http.Request, queryAPI api.RoomserverQueryAPI, cfg config.Dendrite,
ctx context.Context, queryAPI roomserverAPI.RoomserverQueryAPI,
asAPI appserviceAPI.AppServiceQueryAPI, cfg config.Dendrite,
inv invite, federation *gomatrixserverlib.FederationClient,
accountDB *accounts.Database,
) (*gomatrixserverlib.Event, error) {
localpart, server, err := gomatrixserverlib.SplitID('@', inv.MXID)
_, server, err := gomatrixserverlib.SplitID('@', inv.MXID)
if err != nil {
return nil, err
}
@ -190,7 +194,7 @@ func createInviteFrom3PIDInvite(
StateKey: &inv.MXID,
}
profile, err := accountDB.GetProfileByLocalpart(req.Context(), localpart)
profile, err := appserviceAPI.RetreiveUserProfile(ctx, inv.MXID, asAPI, accountDB)
if err != nil {
return nil, err
}
@ -208,9 +212,9 @@ func createInviteFrom3PIDInvite(
return nil, err
}
event, err := buildMembershipEvent(req, builder, queryAPI, cfg)
event, err := buildMembershipEvent(ctx, builder, queryAPI, cfg)
if err == errNotInRoom {
return nil, sendToRemoteServer(req.Context(), inv, federation, cfg, *builder)
return nil, sendToRemoteServer(ctx, inv, federation, cfg, *builder)
}
if err != nil {
return nil, err
@ -225,8 +229,8 @@ func createInviteFrom3PIDInvite(
// Returns errNotInRoom if the server is not in the room the invite is for.
// Returns an error if something failed during the process.
func buildMembershipEvent(
req *http.Request,
builder *gomatrixserverlib.EventBuilder, queryAPI api.RoomserverQueryAPI,
ctx context.Context,
builder *gomatrixserverlib.EventBuilder, queryAPI roomserverAPI.RoomserverQueryAPI,
cfg config.Dendrite,
) (*gomatrixserverlib.Event, error) {
eventsNeeded, err := gomatrixserverlib.StateNeededForEventBuilder(builder)
@ -235,12 +239,12 @@ func buildMembershipEvent(
}
// Ask the roomserver for information about this room
queryReq := api.QueryLatestEventsAndStateRequest{
queryReq := roomserverAPI.QueryLatestEventsAndStateRequest{
RoomID: builder.RoomID,
StateToFetch: eventsNeeded.Tuples(),
}
var queryRes api.QueryLatestEventsAndStateResponse
if err = queryAPI.QueryLatestEventsAndState(req.Context(), &queryReq, &queryRes); err != nil {
var queryRes roomserverAPI.QueryLatestEventsAndStateResponse
if err = queryAPI.QueryLatestEventsAndState(ctx, &queryReq, &queryRes); err != nil {
return nil, err
}
@ -273,8 +277,9 @@ func buildMembershipEvent(
builder.AuthEvents = refs
eventID := fmt.Sprintf("$%s:%s", util.RandomString(16), cfg.Matrix.ServerName)
eventTime := common.ParseTSParam(req)
event, err := builder.Build(eventID, eventTime, cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey)
event, err := builder.Build(
eventID, time.Now(), cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey,
)
return &event, err
}
@ -306,7 +311,7 @@ func sendToRemoteServer(
if err == nil {
return
}
logrus.WithError(err).Warn("failed to send 3PID invite via %s", server)
logrus.WithError(err).Warnf("failed to send 3PID invite via %s", server)
}
return errors.New("failed to send 3PID invite via any server")

View file

@ -0,0 +1,96 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package consumers
import (
"context"
"encoding/json"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/federationsender/queue"
"github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/typingserver/api"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
"gopkg.in/Shopify/sarama.v1"
)
// OutputTypingEventConsumer consumes events that originate in typing server.
type OutputTypingEventConsumer struct {
consumer *common.ContinualConsumer
db *storage.Database
queues *queue.OutgoingQueues
ServerName gomatrixserverlib.ServerName
}
// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer. Call Start() to begin consuming from typing servers.
func NewOutputTypingEventConsumer(
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
queues *queue.OutgoingQueues,
store *storage.Database,
) *OutputTypingEventConsumer {
consumer := common.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputTypingEvent),
Consumer: kafkaConsumer,
PartitionStore: store,
}
c := &OutputTypingEventConsumer{
consumer: &consumer,
queues: queues,
db: store,
ServerName: cfg.Matrix.ServerName,
}
consumer.ProcessMessage = c.onMessage
return c
}
// Start consuming from typing servers
func (t *OutputTypingEventConsumer) Start() error {
return t.consumer.Start()
}
// onMessage is called for OutputTypingEvent received from the typing servers.
// Parses the msg, creates a matrix federation EDU and sends it to joined hosts.
func (t *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// Extract the typing event from msg.
var ote api.OutputTypingEvent
if err := json.Unmarshal(msg.Value, &ote); err != nil {
// Skip this msg but continue processing messages.
log.WithError(err).Errorf("typingserver output log: message parse failed")
return nil
}
joined, err := t.db.GetJoinedHosts(context.TODO(), ote.Event.RoomID)
if err != nil {
return err
}
names := make([]gomatrixserverlib.ServerName, len(joined))
for i := range joined {
names[i] = joined[i].ServerName
}
edu := &gomatrixserverlib.EDU{Type: ote.Event.Type}
if edu.Content, err = json.Marshal(map[string]interface{}{
"room_id": ote.Event.RoomID,
"user_id": ote.Event.UserID,
"typing": ote.Event.Typing,
}); err != nil {
return err
}
return t.queues.SendEDU(edu, t.ServerName, names)
}

View file

@ -38,11 +38,18 @@ func SetupFederationSenderComponent(
queues := queue.NewOutgoingQueues(base.Cfg.Matrix.ServerName, federation)
consumer := consumers.NewOutputRoomEventConsumer(
rsConsumer := consumers.NewOutputRoomEventConsumer(
base.Cfg, base.KafkaConsumer, queues,
federationSenderDB, queryAPI,
)
if err = consumer.Start(); err != nil {
if err = rsConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start room server consumer")
}
tsConsumer := consumers.NewOutputTypingEventConsumer(
base.Cfg, base.KafkaConsumer, queues, federationSenderDB,
)
if err := tsConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start typing server consumer")
}
}

View file

@ -33,12 +33,13 @@ type destinationQueue struct {
origin gomatrixserverlib.ServerName
destination gomatrixserverlib.ServerName
// The running mutex protects running, sentCounter, lastTransactionIDs and
// pendingEvents.
// pendingEvents, pendingEDUs.
runningMutex sync.Mutex
running bool
sentCounter int
lastTransactionIDs []gomatrixserverlib.TransactionID
pendingEvents []*gomatrixserverlib.Event
pendingEDUs []*gomatrixserverlib.EDU
}
// Send event adds the event to the pending queue for the destination.
@ -54,6 +55,19 @@ func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.Event) {
}
}
// sendEDU adds the EDU event to the pending queue for the destination.
// If the queue is empty then it starts a background goroutine to
// start sending event to that destination.
func (oq *destinationQueue) sendEDU(e *gomatrixserverlib.EDU) {
oq.runningMutex.Lock()
defer oq.runningMutex.Unlock()
oq.pendingEDUs = append(oq.pendingEDUs, e)
if !oq.running {
oq.running = true
go oq.backgroundSend()
}
}
func (oq *destinationQueue) backgroundSend() {
for {
t := oq.next()
@ -82,10 +96,12 @@ func (oq *destinationQueue) backgroundSend() {
func (oq *destinationQueue) next() *gomatrixserverlib.Transaction {
oq.runningMutex.Lock()
defer oq.runningMutex.Unlock()
if len(oq.pendingEvents) == 0 {
if len(oq.pendingEvents) == 0 && len(oq.pendingEDUs) == 0 {
oq.running = false
return nil
}
var t gomatrixserverlib.Transaction
now := gomatrixserverlib.AsTimestamp(time.Now())
t.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.sentCounter))
@ -96,11 +112,20 @@ func (oq *destinationQueue) next() *gomatrixserverlib.Transaction {
if t.PreviousIDs == nil {
t.PreviousIDs = []gomatrixserverlib.TransactionID{}
}
oq.lastTransactionIDs = []gomatrixserverlib.TransactionID{t.TransactionID}
for _, pdu := range oq.pendingEvents {
t.PDUs = append(t.PDUs, *pdu)
}
oq.pendingEvents = nil
oq.sentCounter += len(t.PDUs)
for _, edu := range oq.pendingEDUs {
t.EDUs = append(t.EDUs, *edu)
}
oq.pendingEDUs = nil
oq.sentCounter += len(t.EDUs)
return &t
}

View file

@ -47,10 +47,7 @@ func (oqs *OutgoingQueues) SendEvent(
destinations []gomatrixserverlib.ServerName,
) error {
if origin != oqs.origin {
// TODO: Support virtual hosting by allowing us to send events using
// different origin server names.
// For now assume we are always asked to send as the single server configured
// in the dendrite config.
// TODO: Support virtual hosting; gh issue #577.
return fmt.Errorf(
"sendevent: unexpected server to send as: got %q expected %q",
origin, oqs.origin,
@ -76,8 +73,49 @@ func (oqs *OutgoingQueues) SendEvent(
}
oqs.queues[destination] = oq
}
oq.sendEvent(ev)
}
return nil
}
// SendEDU sends an EDU event to the destinations
func (oqs *OutgoingQueues) SendEDU(
e *gomatrixserverlib.EDU, origin gomatrixserverlib.ServerName,
destinations []gomatrixserverlib.ServerName,
) error {
if origin != oqs.origin {
// TODO: Support virtual hosting; gh issue #577.
return fmt.Errorf(
"sendevent: unexpected server to send as: got %q expected %q",
origin, oqs.origin,
)
}
// Remove our own server from the list of destinations.
destinations = filterDestinations(oqs.origin, destinations)
log.WithFields(log.Fields{
"destinations": destinations, "edu_type": e.Type,
}).Info("Sending EDU event")
oqs.queuesMutex.Lock()
defer oqs.queuesMutex.Unlock()
for _, destination := range destinations {
oq := oqs.queues[destination]
if oq == nil {
oq = &destinationQueue{
origin: oqs.origin,
destination: destination,
client: oqs.client,
}
oqs.queues[destination] = oq
}
oq.sendEDU(e)
}
return nil
}

View file

@ -97,10 +97,22 @@ func (s *joinedHostsStatements) deleteJoinedHosts(
return err
}
func (s *joinedHostsStatements) selectJoinedHosts(
func (s *joinedHostsStatements) selectJoinedHostsWithTx(
ctx context.Context, txn *sql.Tx, roomID string,
) ([]types.JoinedHost, error) {
stmt := common.TxStmt(txn, s.selectJoinedHostsStmt)
return joinedHostsFromStmt(ctx, stmt, roomID)
}
func (s *joinedHostsStatements) selectJoinedHosts(
ctx context.Context, roomID string,
) ([]types.JoinedHost, error) {
return joinedHostsFromStmt(ctx, s.selectJoinedHostsStmt, roomID)
}
func joinedHostsFromStmt(
ctx context.Context, stmt *sql.Stmt, roomID string,
) ([]types.JoinedHost, error) {
rows, err := stmt.QueryContext(ctx, roomID)
if err != nil {
return nil, err
@ -118,5 +130,6 @@ func (s *joinedHostsStatements) selectJoinedHosts(
ServerName: gomatrixserverlib.ServerName(serverName),
})
}
return result, nil
}

View file

@ -92,7 +92,7 @@ func (d *Database) UpdateRoom(
}
}
joinedHosts, err = d.selectJoinedHosts(ctx, txn, roomID)
joinedHosts, err = d.selectJoinedHostsWithTx(ctx, txn, roomID)
if err != nil {
return err
}
@ -110,3 +110,12 @@ func (d *Database) UpdateRoom(
})
return
}
// GetJoinedHosts returns the currently joined hosts for room,
// as known to federationserver.
// Returns an error if something goes wrong.
func (d *Database) GetJoinedHosts(
ctx context.Context, roomID string,
) ([]types.JoinedHost, error) {
return d.selectJoinedHosts(ctx, roomID)
}

View file

@ -21,9 +21,10 @@ import (
"net/http"
"time"
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/roomserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
@ -44,19 +45,20 @@ type RoomserverAliasAPIDatabase interface {
RemoveRoomAlias(ctx context.Context, alias string) error
}
// RoomserverAliasAPI is an implementation of api.RoomserverAliasAPI
// RoomserverAliasAPI is an implementation of alias.RoomserverAliasAPI
type RoomserverAliasAPI struct {
DB RoomserverAliasAPIDatabase
Cfg *config.Dendrite
InputAPI api.RoomserverInputAPI
QueryAPI api.RoomserverQueryAPI
DB RoomserverAliasAPIDatabase
Cfg *config.Dendrite
InputAPI roomserverAPI.RoomserverInputAPI
QueryAPI roomserverAPI.RoomserverQueryAPI
AppserviceAPI appserviceAPI.AppServiceQueryAPI
}
// SetRoomAlias implements api.RoomserverAliasAPI
// SetRoomAlias implements alias.RoomserverAliasAPI
func (r *RoomserverAliasAPI) SetRoomAlias(
ctx context.Context,
request *api.SetRoomAliasRequest,
response *api.SetRoomAliasResponse,
request *roomserverAPI.SetRoomAliasRequest,
response *roomserverAPI.SetRoomAliasResponse,
) error {
// Check if the alias isn't already referring to a room
roomID, err := r.DB.GetRoomIDForAlias(ctx, request.Alias)
@ -82,11 +84,11 @@ func (r *RoomserverAliasAPI) SetRoomAlias(
return r.sendUpdatedAliasesEvent(context.TODO(), request.UserID, request.RoomID)
}
// GetRoomIDForAlias implements api.RoomserverAliasAPI
// GetRoomIDForAlias implements alias.RoomserverAliasAPI
func (r *RoomserverAliasAPI) GetRoomIDForAlias(
ctx context.Context,
request *api.GetRoomIDForAliasRequest,
response *api.GetRoomIDForAliasResponse,
request *roomserverAPI.GetRoomIDForAliasRequest,
response *roomserverAPI.GetRoomIDForAliasResponse,
) error {
// Look up the room ID in the database
roomID, err := r.DB.GetRoomIDForAlias(ctx, request.Alias)
@ -94,15 +96,23 @@ func (r *RoomserverAliasAPI) GetRoomIDForAlias(
return err
}
// No rooms found locally, try our application services by making a call to
// the appservice component
aliasReq := appserviceAPI.RoomAliasExistsRequest{Alias: request.Alias}
var aliasResp appserviceAPI.RoomAliasExistsResponse
if err = r.AppserviceAPI.RoomAliasExists(ctx, &aliasReq, &aliasResp); err != nil {
return err
}
response.RoomID = roomID
return nil
}
// GetAliasesForRoomID implements api.RoomserverAliasAPI
// GetAliasesForRoomID implements alias.RoomserverAliasAPI
func (r *RoomserverAliasAPI) GetAliasesForRoomID(
ctx context.Context,
request *api.GetAliasesForRoomIDRequest,
response *api.GetAliasesForRoomIDResponse,
request *roomserverAPI.GetAliasesForRoomIDRequest,
response *roomserverAPI.GetAliasesForRoomIDResponse,
) error {
// Look up the aliases in the database for the given RoomID
aliases, err := r.DB.GetAliasesForRoomID(ctx, request.RoomID)
@ -114,11 +124,11 @@ func (r *RoomserverAliasAPI) GetAliasesForRoomID(
return nil
}
// RemoveRoomAlias implements api.RoomserverAliasAPI
// RemoveRoomAlias implements alias.RoomserverAliasAPI
func (r *RoomserverAliasAPI) RemoveRoomAlias(
ctx context.Context,
request *api.RemoveRoomAliasRequest,
response *api.RemoveRoomAliasResponse,
request *roomserverAPI.RemoveRoomAliasRequest,
response *roomserverAPI.RemoveRoomAliasResponse,
) error {
// Look up the room ID in the database
roomID, err := r.DB.GetRoomIDForAlias(ctx, request.Alias)
@ -177,11 +187,11 @@ func (r *RoomserverAliasAPI) sendUpdatedAliasesEvent(
if err != nil {
return err
}
req := api.QueryLatestEventsAndStateRequest{
req := roomserverAPI.QueryLatestEventsAndStateRequest{
RoomID: roomID,
StateToFetch: eventsNeeded.Tuples(),
}
var res api.QueryLatestEventsAndStateResponse
var res roomserverAPI.QueryLatestEventsAndStateResponse
if err = r.QueryAPI.QueryLatestEventsAndState(ctx, &req, &res); err != nil {
return err
}
@ -213,16 +223,16 @@ func (r *RoomserverAliasAPI) sendUpdatedAliasesEvent(
}
// Create the request
ire := api.InputRoomEvent{
Kind: api.KindNew,
ire := roomserverAPI.InputRoomEvent{
Kind: roomserverAPI.KindNew,
Event: event,
AuthEventIDs: event.AuthEventIDs(),
SendAsServer: serverName,
}
inputReq := api.InputRoomEventsRequest{
InputRoomEvents: []api.InputRoomEvent{ire},
inputReq := roomserverAPI.InputRoomEventsRequest{
InputRoomEvents: []roomserverAPI.InputRoomEvent{ire},
}
var inputRes api.InputRoomEventsResponse
var inputRes roomserverAPI.InputRoomEventsResponse
// Send the request
return r.InputAPI.InputRoomEvents(ctx, &inputReq, &inputRes)
@ -231,10 +241,10 @@ func (r *RoomserverAliasAPI) sendUpdatedAliasesEvent(
// SetupHTTP adds the RoomserverAliasAPI handlers to the http.ServeMux.
func (r *RoomserverAliasAPI) SetupHTTP(servMux *http.ServeMux) {
servMux.Handle(
api.RoomserverSetRoomAliasPath,
roomserverAPI.RoomserverSetRoomAliasPath,
common.MakeInternalAPI("setRoomAlias", func(req *http.Request) util.JSONResponse {
var request api.SetRoomAliasRequest
var response api.SetRoomAliasResponse
var request roomserverAPI.SetRoomAliasRequest
var response roomserverAPI.SetRoomAliasResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err)
}
@ -245,10 +255,10 @@ func (r *RoomserverAliasAPI) SetupHTTP(servMux *http.ServeMux) {
}),
)
servMux.Handle(
api.RoomserverGetRoomIDForAliasPath,
roomserverAPI.RoomserverGetRoomIDForAliasPath,
common.MakeInternalAPI("GetRoomIDForAlias", func(req *http.Request) util.JSONResponse {
var request api.GetRoomIDForAliasRequest
var response api.GetRoomIDForAliasResponse
var request roomserverAPI.GetRoomIDForAliasRequest
var response roomserverAPI.GetRoomIDForAliasResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err)
}
@ -259,10 +269,10 @@ func (r *RoomserverAliasAPI) SetupHTTP(servMux *http.ServeMux) {
}),
)
servMux.Handle(
api.RoomserverRemoveRoomAliasPath,
roomserverAPI.RoomserverRemoveRoomAliasPath,
common.MakeInternalAPI("removeRoomAlias", func(req *http.Request) util.JSONResponse {
var request api.RemoveRoomAliasRequest
var response api.RemoveRoomAliasResponse
var request roomserverAPI.RemoveRoomAliasRequest
var response roomserverAPI.RemoveRoomAliasResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err)
}

View file

@ -19,8 +19,7 @@ import (
"encoding/json"
"testing"
"sort"
"github.com/matrix-org/dendrite/common/test"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
)
@ -90,24 +89,6 @@ func (db *getEventDB) EventsFromIDs(ctx context.Context, eventIDs []string) (res
return
}
// Returns if the slices are equal after sorting them.
func compareUnsortedStringSlices(a []string, b []string) bool {
if len(a) != len(b) {
return false
}
sort.Strings(a)
sort.Strings(b)
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}
func TestGetAuthChainSingle(t *testing.T) {
db := createEventDB()
@ -135,7 +116,7 @@ func TestGetAuthChainSingle(t *testing.T) {
expectedIDs := []string{"a", "b", "c", "d", "e"}
if !compareUnsortedStringSlices(expectedIDs, returnedIDs) {
if !test.UnsortedStringSliceEqual(expectedIDs, returnedIDs) {
t.Fatalf("returnedIDs got '%v', expected '%v'", returnedIDs, expectedIDs)
}
}
@ -168,7 +149,7 @@ func TestGetAuthChainMultiple(t *testing.T) {
expectedIDs := []string{"a", "b", "c", "d", "e", "f"}
if !compareUnsortedStringSlices(expectedIDs, returnedIDs) {
if !test.UnsortedStringSliceEqual(expectedIDs, returnedIDs) {
t.Fatalf("returnedIDs got '%v', expected '%v'", returnedIDs, expectedIDs)
}
}

View file

@ -19,6 +19,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/api"
asQuery "github.com/matrix-org/dendrite/appservice/query"
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/roomserver/alias"
"github.com/matrix-org/dendrite/roomserver/input"
@ -51,11 +52,14 @@ func SetupRoomServerComponent(
queryAPI.SetupHTTP(http.DefaultServeMux)
asAPI := asQuery.AppServiceQueryAPI{Cfg: base.Cfg}
aliasAPI := alias.RoomserverAliasAPI{
DB: roomserverDB,
Cfg: base.Cfg,
InputAPI: &inputAPI,
QueryAPI: &queryAPI,
DB: roomserverDB,
Cfg: base.Cfg,
InputAPI: &inputAPI,
QueryAPI: &queryAPI,
AppserviceAPI: &asAPI,
}
aliasAPI.SetupHTTP(http.DefaultServeMux)

View file

@ -0,0 +1,83 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package api provides the types that are used to communicate with the typing server.
package api
import (
"context"
"net/http"
commonHTTP "github.com/matrix-org/dendrite/common/http"
"github.com/matrix-org/gomatrixserverlib"
opentracing "github.com/opentracing/opentracing-go"
)
// InputTypingEvent is an event for notifying the typing server about typing updates.
type InputTypingEvent struct {
// UserID of the user to update typing status.
UserID string `json:"user_id"`
// RoomID of the room the user is typing (or has stopped).
RoomID string `json:"room_id"`
// Typing is true if the user is typing, false if they have stopped.
Typing bool `json:"typing"`
// Timeout is the interval for which the user should be marked as typing.
Timeout int64 `json:"timeout"`
// OriginServerTS when the server received the update.
OriginServerTS gomatrixserverlib.Timestamp `json:"origin_server_ts"`
}
// InputTypingEventRequest is a request to TypingServerInputAPI
type InputTypingEventRequest struct {
InputTypingEvent InputTypingEvent `json:"input_typing_event"`
}
// InputTypingEventResponse is a response to InputTypingEvents
type InputTypingEventResponse struct{}
// TypingServerInputAPI is used to write events to the typing server.
type TypingServerInputAPI interface {
InputTypingEvent(
ctx context.Context,
request *InputTypingEventRequest,
response *InputTypingEventResponse,
) error
}
// TypingServerInputTypingEventPath is the HTTP path for the InputTypingEvent API.
const TypingServerInputTypingEventPath = "/api/typingserver/input"
// NewTypingServerInputAPIHTTP creates a TypingServerInputAPI implemented by talking to a HTTP POST API.
func NewTypingServerInputAPIHTTP(typingServerURL string, httpClient *http.Client) TypingServerInputAPI {
if httpClient == nil {
httpClient = http.DefaultClient
}
return &httpTypingServerInputAPI{typingServerURL, httpClient}
}
type httpTypingServerInputAPI struct {
typingServerURL string
httpClient *http.Client
}
// InputRoomEvents implements TypingServerInputAPI
func (h *httpTypingServerInputAPI) InputTypingEvent(
ctx context.Context,
request *InputTypingEventRequest,
response *InputTypingEventResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "InputTypingEvent")
defer span.Finish()
apiURL := h.typingServerURL + TypingServerInputTypingEventPath
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}

View file

@ -0,0 +1,31 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package api
// OutputTypingEvent is an entry in typing server output kafka log.
// This contains the event with extra fields used to create 'm.typing' event
// in clientapi & federation.
type OutputTypingEvent struct {
// The Event for the typing edu event.
Event TypingEvent `json:"event"`
// Users typing in the room when the event was generated.
TypingUsers []string `json:"typing_users"`
}
// TypingEvent represents a matrix edu event of type 'm.typing'.
type TypingEvent struct {
Type string `json:"type"`
RoomID string `json:"room_id"`
UserID string `json:"user_id"`
Typing bool `json:"typing"`
}

View file

@ -0,0 +1,108 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cache
import (
"sync"
"time"
)
const defaultTypingTimeout = 10 * time.Second
// userSet is a map of user IDs to a timer, timer fires at expiry.
type userSet map[string]*time.Timer
// TypingCache maintains a list of users typing in each room.
type TypingCache struct {
sync.RWMutex
data map[string]userSet
}
// NewTypingCache returns a new TypingCache initialized for use.
func NewTypingCache() *TypingCache {
return &TypingCache{data: make(map[string]userSet)}
}
// GetTypingUsers returns the list of users typing in a room.
func (t *TypingCache) GetTypingUsers(roomID string) (users []string) {
t.RLock()
usersMap, ok := t.data[roomID]
t.RUnlock()
if ok {
users = make([]string, 0, len(usersMap))
for userID := range usersMap {
users = append(users, userID)
}
}
return
}
// AddTypingUser sets an user as typing in a room.
// expire is the time when the user typing should time out.
// if expire is nil, defaultTypingTimeout is assumed.
func (t *TypingCache) AddTypingUser(userID, roomID string, expire *time.Time) {
expireTime := getExpireTime(expire)
if until := time.Until(expireTime); until > 0 {
timer := time.AfterFunc(until, t.timeoutCallback(userID, roomID))
t.addUser(userID, roomID, timer)
}
}
// addUser with mutex lock & replace the previous timer.
func (t *TypingCache) addUser(userID, roomID string, expiryTimer *time.Timer) {
t.Lock()
defer t.Unlock()
if t.data[roomID] == nil {
t.data[roomID] = make(userSet)
}
// Stop the timer to cancel the call to timeoutCallback
if timer, ok := t.data[roomID][userID]; ok {
// It may happen that at this stage timer fires but now we have a lock on t.
// Hence the execution of timeoutCallback will happen after we unlock.
// So we may lose a typing state, though this event is highly unlikely.
// This can be mitigated by keeping another time.Time in the map and check against it
// before removing. This however is not required in most practical scenario.
timer.Stop()
}
t.data[roomID][userID] = expiryTimer
}
// Returns a function which is called after timeout happens.
// This removes the user.
func (t *TypingCache) timeoutCallback(userID, roomID string) func() {
return func() {
t.RemoveUser(userID, roomID)
}
}
// RemoveUser with mutex lock & stop the timer.
func (t *TypingCache) RemoveUser(userID, roomID string) {
t.Lock()
defer t.Unlock()
if timer, ok := t.data[roomID][userID]; ok {
timer.Stop()
delete(t.data[roomID], userID)
}
}
func getExpireTime(expire *time.Time) time.Time {
if expire != nil {
return *expire
}
return time.Now().Add(defaultTypingTimeout)
}

View file

@ -0,0 +1,99 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cache
import (
"testing"
"time"
"github.com/matrix-org/dendrite/common/test"
)
func TestTypingCache(t *testing.T) {
tCache := NewTypingCache()
if tCache == nil {
t.Fatal("NewTypingCache failed")
}
t.Run("AddTypingUser", func(t *testing.T) {
testAddTypingUser(t, tCache)
})
t.Run("GetTypingUsers", func(t *testing.T) {
testGetTypingUsers(t, tCache)
})
t.Run("RemoveUser", func(t *testing.T) {
testRemoveUser(t, tCache)
})
}
func testAddTypingUser(t *testing.T, tCache *TypingCache) {
present := time.Now()
tests := []struct {
userID string
roomID string
expire *time.Time
}{ // Set four users typing state to room1
{"user1", "room1", nil},
{"user2", "room1", nil},
{"user3", "room1", nil},
{"user4", "room1", nil},
//typing state with past expireTime should not take effect or removed.
{"user1", "room2", &present},
}
for _, tt := range tests {
tCache.AddTypingUser(tt.userID, tt.roomID, tt.expire)
}
}
func testGetTypingUsers(t *testing.T, tCache *TypingCache) {
tests := []struct {
roomID string
wantUsers []string
}{
{"room1", []string{"user1", "user2", "user3", "user4"}},
{"room2", []string{}},
}
for _, tt := range tests {
gotUsers := tCache.GetTypingUsers(tt.roomID)
if !test.UnsortedStringSliceEqual(gotUsers, tt.wantUsers) {
t.Errorf("TypingCache.GetTypingUsers(%s) = %v, want %v", tt.roomID, gotUsers, tt.wantUsers)
}
}
}
func testRemoveUser(t *testing.T, tCache *TypingCache) {
tests := []struct {
roomID string
userIDs []string
}{
{"room3", []string{"user1"}},
{"room4", []string{"user1", "user2", "user3"}},
}
for _, tt := range tests {
for _, userID := range tt.userIDs {
tCache.AddTypingUser(userID, tt.roomID, nil)
}
length := len(tt.userIDs)
tCache.RemoveUser(tt.userIDs[length-1], tt.roomID)
expLeftUsers := tt.userIDs[:length-1]
if leftUsers := tCache.GetTypingUsers(tt.roomID); !test.UnsortedStringSliceEqual(leftUsers, expLeftUsers) {
t.Errorf("Response after removal is unexpected. Want = %s, got = %s", leftUsers, expLeftUsers)
}
}
}

View file

@ -0,0 +1,101 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package input
import (
"context"
"encoding/json"
"net/http"
"time"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/typingserver/api"
"github.com/matrix-org/dendrite/typingserver/cache"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"gopkg.in/Shopify/sarama.v1"
)
// TypingServerInputAPI implements api.TypingServerInputAPI
type TypingServerInputAPI struct {
// Cache to store the current typing members in each room.
Cache *cache.TypingCache
// The kafka topic to output new typing events to.
OutputTypingEventTopic string
// kafka producer
Producer sarama.SyncProducer
}
// InputTypingEvent implements api.TypingServerInputAPI
func (t *TypingServerInputAPI) InputTypingEvent(
ctx context.Context,
request *api.InputTypingEventRequest,
response *api.InputTypingEventResponse,
) error {
ite := &request.InputTypingEvent
if ite.Typing {
// user is typing, update our current state of users typing.
expireTime := ite.OriginServerTS.Time().Add(
time.Duration(ite.Timeout) * time.Millisecond,
)
t.Cache.AddTypingUser(ite.UserID, ite.RoomID, &expireTime)
} else {
t.Cache.RemoveUser(ite.UserID, ite.RoomID)
}
return t.sendEvent(ite)
}
func (t *TypingServerInputAPI) sendEvent(ite *api.InputTypingEvent) error {
userIDs := t.Cache.GetTypingUsers(ite.RoomID)
ev := &api.TypingEvent{
Type: gomatrixserverlib.MTyping,
RoomID: ite.RoomID,
UserID: ite.UserID,
}
ote := &api.OutputTypingEvent{
Event: *ev,
TypingUsers: userIDs,
}
eventJSON, err := json.Marshal(ote)
if err != nil {
return err
}
m := &sarama.ProducerMessage{
Topic: string(t.OutputTypingEventTopic),
Key: sarama.StringEncoder(ite.RoomID),
Value: sarama.ByteEncoder(eventJSON),
}
_, _, err = t.Producer.SendMessage(m)
return err
}
// SetupHTTP adds the TypingServerInputAPI handlers to the http.ServeMux.
func (t *TypingServerInputAPI) SetupHTTP(servMux *http.ServeMux) {
servMux.Handle(api.TypingServerInputTypingEventPath,
common.MakeInternalAPI("inputTypingEvents", func(req *http.Request) util.JSONResponse {
var request api.InputTypingEventRequest
var response api.InputTypingEventResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := t.InputTypingEvent(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
}

View file

@ -0,0 +1,40 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package typingserver
import (
"net/http"
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/typingserver/api"
"github.com/matrix-org/dendrite/typingserver/cache"
"github.com/matrix-org/dendrite/typingserver/input"
)
// SetupTypingServerComponent sets up and registers HTTP handlers for the
// TypingServer component. Returns instances of the various roomserver APIs,
// allowing other components running in the same process to hit the query the
// APIs directly instead of having to use HTTP.
func SetupTypingServerComponent(
base *basecomponent.BaseDendrite,
typingCache *cache.TypingCache,
) api.TypingServerInputAPI {
inputAPI := &input.TypingServerInputAPI{
Cache: typingCache,
Producer: base.KafkaProducer,
OutputTypingEventTopic: string(base.Cfg.Kafka.Topics.OutputTypingEvent),
}
inputAPI.SetupHTTP(http.DefaultServeMux)
return inputAPI
}

28
vendor/manifest vendored
View file

@ -108,6 +108,19 @@
"revision": "392c28fe23e1c45ddba891b0320b3b5df220beea",
"branch": "master"
},
{
"importpath": "github.com/jaegertracing/jaeger-client-go",
"repository": "https://github.com/jaegertracing/jaeger-client-go",
"revision": "3ad49a1d839b517923a6fdac36d81cbf7b744f37",
"branch": "master"
},
{
"importpath": "github.com/jaegertracing/jaeger-lib/metrics",
"repository": "https://github.com/jaegertracing/jaeger-lib",
"revision": "21a3da6d66fe0e278072676fdc84cd4c9ccb9b67",
"branch": "master",
"path": "/metrics"
},
{
"importpath": "github.com/klauspost/crc32",
"repository": "https://github.com/klauspost/crc32",
@ -135,7 +148,7 @@
{
"importpath": "github.com/matrix-org/gomatrixserverlib",
"repository": "https://github.com/matrix-org/gomatrixserverlib",
"revision": "929828872b51e6733166553d6b1a20155b6ab829",
"revision": "677bbe93ffc9ad9ba5de615cd81185d0493f5d25",
"branch": "master"
},
{
@ -304,19 +317,6 @@
"revision": "54f72d32435d760d5604f17a82e2435b28dc4ba5",
"branch": "master"
},
{
"importpath": "github.com/jaegertracing/jaeger-client-go",
"repository": "https://github.com/jaegertracing/jaeger-client-go",
"revision": "3ad49a1d839b517923a6fdac36d81cbf7b744f37",
"branch": "master"
},
{
"importpath": "github.com/jaegertracing/jaeger-lib/metrics",
"repository": "https://github.com/jaegertracing/jaeger-lib",
"revision": "21a3da6d66fe0e278072676fdc84cd4c9ccb9b67",
"branch": "master",
"path": "/metrics"
},
{
"importpath": "github.com/uber/tchannel-go",
"repository": "https://github.com/uber/tchannel-go",

View file

@ -0,0 +1,23 @@
/* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package gomatrixserverlib
// EDU represents a EDU received via federation
// https://matrix.org/docs/spec/server_server/unstable.html#edus
type EDU struct {
Type string `json:"edu_type"`
Origin string `json:"origin"`
Destination string `json:"destination"`
Content RawJSON `json:"content"`
}

View file

@ -47,6 +47,8 @@ const (
MRoomHistoryVisibility = "m.room.history_visibility"
// MRoomRedaction https://matrix.org/docs/spec/client_server/r0.2.0.html#id21
MRoomRedaction = "m.room.redaction"
// MTyping https://matrix.org/docs/spec/client_server/r0.3.0.html#m-typing
MTyping = "m.typing"
)
// StateNeeded lists the event types and state_keys needed to authenticate an event.

0
vendor/src/github.com/matrix-org/gomatrixserverlib/hooks/install.sh vendored Executable file → Normal file
View file

0
vendor/src/github.com/matrix-org/gomatrixserverlib/hooks/pre-commit vendored Executable file → Normal file
View file

View file

@ -11,7 +11,7 @@
"structcheck",
"maligned",
"ineffassign",
"gas",
"gosec",
"misspell",
"gosimple",
"megacheck",

View file

@ -16,11 +16,14 @@ type Transaction struct {
// the destination server. Multiple transactions can be sent by the origin
// server to the destination server in parallel so there may be more than
// one previous transaction.
PreviousIDs []TransactionID `json:"previous_ids"`
PreviousIDs []TransactionID `json:"previous_ids,omitempty"`
// The room events pushed from the origin server to the destination server
// by this transaction. The events should either be events that originate
// on the origin server or be join m.room.member events.
PDUs []Event `json:"pdus"`
// The ephemeral events pushed from origin server to destination server
// by this transaction. The events must orginate at the origin server.
EDUs []EDU `json:"edus,omitempty"`
}
// A TransactionID identifies a transaction sent by a matrix server to another

0
vendor/src/github.com/matrix-org/gomatrixserverlib/travis.sh vendored Executable file → Normal file
View file