Merge pull request #1 from matrix-org/master

update from master
This commit is contained in:
Terrill Tsang 2018-07-27 11:18:20 +08:00 committed by GitHub
commit a8a6422d50
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
67 changed files with 2281 additions and 439 deletions

View file

@ -2,6 +2,7 @@ language: go
go:
- 1.8.x
- 1.9.x
- 1.10.x
env:
- TEST_SUITE="lint"

View file

@ -72,7 +72,7 @@ Dendrite requires a postgres database engine, version 9.5 or later.
```
* Create databases:
```bash
for i in account device mediaapi syncapi roomserver serverkey federationsender publicroomsapi naffka; do
for i in account device mediaapi syncapi roomserver serverkey federationsender publicroomsapi appservice naffka; do
sudo -u postgres createdb -O dendrite dendrite_$i
done
```
@ -253,3 +253,14 @@ you want to support federation.
```bash
./bin/dendrite-federation-sender-server --config dendrite.yaml
```
### Run an appservice server
This sends events from the network to [application
services](https://matrix.org/docs/spec/application_service/unstable.html)
running locally. This is only required if you want to support running
application services on your homeserver.
```bash
./bin/dendrite-appservice-server --config dendrite.yaml
```

View file

@ -36,7 +36,11 @@ Diagram:
| | | | | | | |
| | | |>==========================>| | | |
| | | | +----------+ | |
| | | | | |
| | | | +---+ | |
| | | | +-------------| R | | |
| | | |>=====>| Application +---+ | |
| | | | | Services | | |
| | | | +--------------+ | |
| | | | +---+ | |
| | | | +--------| R | | |
| | | | | Client +---+ | |
@ -190,3 +194,36 @@ choke-point to implement ratelimiting and backoff correctly.
* Reads new events and the current state of the rooms from logs writeen by the Room Server.
* Reads the position of the read marker from the Receipts Server.
* Makes outbound HTTP hits to the push server for the client device.
## Application Service
* Receives events from the Room Server.
* Filters events and sends them to each registered application service.
* Runs a separate goroutine for each application service.
# Internal Component API
Some dendrite components use internal APIs to communicate information back
and forth between each other. There are two implementations of each API, one
that uses HTTP requests and one that does not. The HTTP implementation is
used in multi-process mode, so processes on separate computers may still
communicate, whereas in single-process or Monolith mode, the direct
implementation is used. HTTP is preferred here to kafka streams as it allows
for request responses.
Running `dendrite-monolith-server` will set up direct connections between
components, whereas running each individual component (which are only run in
multi-process mode) will set up HTTP-based connections.
The functions that make HTTP requests to internal APIs of a component are
located in `/<component name>/api/<name>.go`, named according to what
functionality they cover. Each of these requests are handled in `/<component
name>/<name>/<name>.go`.
As an example, the `appservices` component allows other Dendrite components
to query external application services via its internal API. A component
would call the desired function in `/appservices/api/query.go`. In
multi-process mode, this would send an internal HTTP request, which would
be handled by a function in `/appservices/query/query.go`. In single-process
mode, no internal HTTP request occurs, instead functions are simply called
directly, thus requiring no changes on the calling component's end.

View file

@ -98,6 +98,7 @@ database:
room_server: "postgres://dendrite:itsasecret@localhost/dendrite_roomserver?sslmode=disable"
server_key: "postgres://dendrite:itsasecret@localhost/dendrite_serverkey?sslmode=disable"
federation_sender: "postgres://dendrite:itsasecret@localhost/dendrite_federationsender?sslmode=disable"
appservice: "postgres://dendrite:itsasecret@localhost/dendrite_appservice?sslmode=disable"
public_rooms_api: "postgres://dendrite:itsasecret@localhost/dendrite_publicroomsapi?sslmode=disable"
encrypt_api: "postgres://dendrite:itsasecret@localhost/dendrite_encryptapi?sslmode=disable"
# If using naffka you need to specify a naffka database
@ -114,7 +115,8 @@ listen:
media_api: "localhost:7774"
public_rooms_api: "localhost:7775"
federation_sender: "localhost:7776"
appservice: "localhost:7777"
appservice_api: "localhost:7777"
typing_server: "localhost:7778"
# The configuration for tracing the dendrite components.
tracing:

View file

@ -2,9 +2,9 @@
This component interfaces with external [Application
Services](https://matrix.org/docs/spec/application_service/unstable.html).
This includes any HTTP endpoints that Application Services call, as well as talking
to any HTTP endpoints that Application Services provide themselves.
This includes any HTTP endpoints that application services call, as well as talking
to any HTTP endpoints that application services provide themselves.
## Consumers
This component consumes and filters events from the Roomserver Kafka stream, passing on any necessary events to subscribing Application Services.
This component consumes and filters events from the Roomserver Kafka stream, passing on any necessary events to subscribing application services.

View file

@ -0,0 +1,49 @@
// Copyright 2018 New Vector Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package api contains methods used by dendrite components in multi-process
// mode to send requests to the appservice component, typically in order to ask
// an application service for some information.
package api
import (
"net/http"
)
// AppServiceQueryAPI is used to query user and room alias data from application
// services
type AppServiceQueryAPI interface {
// TODO: Check whether a room alias exists within any application service namespaces
// TODO: QueryUserIDExists
}
// httpAppServiceQueryAPI contains the URL to an appservice query API and a
// reference to a httpClient used to reach it
type httpAppServiceQueryAPI struct {
appserviceURL string
httpClient *http.Client
}
// NewAppServiceQueryAPIHTTP creates a AppServiceQueryAPI implemented by talking
// to a HTTP POST API.
// If httpClient is nil then it uses http.DefaultClient
func NewAppServiceQueryAPIHTTP(
appserviceURL string,
httpClient *http.Client,
) AppServiceQueryAPI {
if httpClient == nil {
httpClient = http.DefaultClient
}
return &httpAppServiceQueryAPI{appserviceURL, httpClient}
}

View file

@ -15,12 +15,24 @@
package appservice
import (
"context"
"net/http"
"sync"
"time"
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/appservice/consumers"
"github.com/matrix-org/dendrite/appservice/query"
"github.com/matrix-org/dendrite/appservice/routing"
"github.com/matrix-org/dendrite/appservice/storage"
"github.com/matrix-org/dendrite/appservice/types"
"github.com/matrix-org/dendrite/appservice/workers"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/common/transactions"
"github.com/matrix-org/dendrite/roomserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
)
@ -30,21 +42,93 @@ import (
func SetupAppServiceAPIComponent(
base *basecomponent.BaseDendrite,
accountsDB *accounts.Database,
deviceDB *devices.Database,
federation *gomatrixserverlib.FederationClient,
aliasAPI api.RoomserverAliasAPI,
queryAPI api.RoomserverQueryAPI,
roomserverAliasAPI roomserverAPI.RoomserverAliasAPI,
roomserverQueryAPI roomserverAPI.RoomserverQueryAPI,
transactionsCache *transactions.Cache,
) {
) appserviceAPI.AppServiceQueryAPI {
// Create a connection to the appservice postgres DB
appserviceDB, err := storage.NewDatabase(string(base.Cfg.Database.AppService))
if err != nil {
logrus.WithError(err).Panicf("failed to connect to appservice db")
}
// Wrap application services in a type that relates the application service and
// a sync.Cond object that can be used to notify workers when there are new
// events to be sent out.
workerStates := make([]types.ApplicationServiceWorkerState, len(base.Cfg.Derived.ApplicationServices))
for i, appservice := range base.Cfg.Derived.ApplicationServices {
m := sync.Mutex{}
ws := types.ApplicationServiceWorkerState{
AppService: appservice,
Cond: sync.NewCond(&m),
}
workerStates[i] = ws
// Create bot account for this AS if it doesn't already exist
if err = generateAppServiceAccount(accountsDB, deviceDB, appservice); err != nil {
logrus.WithFields(logrus.Fields{
"appservice": appservice.ID,
}).WithError(err).Panicf("failed to generate bot account for appservice")
}
}
// Create a HTTP client that this component will use for all outbound and
// inbound requests (inbound only for the internal API)
httpClient := &http.Client{
Timeout: time.Second * 30,
}
appserviceQueryAPI := query.AppServiceQueryAPI{
HTTPClient: httpClient,
Cfg: base.Cfg,
}
appserviceQueryAPI.SetupHTTP(http.DefaultServeMux)
consumer := consumers.NewOutputRoomEventConsumer(
base.Cfg, base.KafkaConsumer, accountsDB, queryAPI, aliasAPI,
base.Cfg, base.KafkaConsumer, accountsDB, appserviceDB,
roomserverQueryAPI, roomserverAliasAPI, workerStates,
)
if err := consumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start app service roomserver consumer")
}
// Create application service transaction workers
if err := workers.SetupTransactionWorkers(appserviceDB, workerStates); err != nil {
logrus.WithError(err).Panicf("failed to start app service transaction workers")
}
// Set up HTTP Endpoints
routing.Setup(
base.APIMux, *base.Cfg, queryAPI, aliasAPI, accountsDB,
federation, transactionsCache,
base.APIMux, *base.Cfg, roomserverQueryAPI, roomserverAliasAPI,
accountsDB, federation, transactionsCache,
)
return &appserviceQueryAPI
}
// generateAppServiceAccounts creates a dummy account based off the
// `sender_localpart` field of each application service if it doesn't
// exist already
func generateAppServiceAccount(
accountsDB *accounts.Database,
deviceDB *devices.Database,
as config.ApplicationService,
) error {
ctx := context.Background()
// Create an account for the application service
acc, err := accountsDB.CreateAccount(ctx, as.SenderLocalpart, "", as.ID)
if err != nil {
return err
} else if acc == nil {
// This account already exists
return nil
}
// Create a dummy device with a dummy token for the application service
_, err = deviceDB.CreateDevice(ctx, as.SenderLocalpart, nil, as.ASToken, &as.SenderLocalpart)
return err
}

View file

@ -17,8 +17,9 @@ package consumers
import (
"context"
"encoding/json"
"fmt"
"github.com/matrix-org/dendrite/appservice/storage"
"github.com/matrix-org/dendrite/appservice/types"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
@ -29,29 +30,28 @@ import (
sarama "gopkg.in/Shopify/sarama.v1"
)
var (
appServices []config.ApplicationService
)
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
roomServerConsumer *common.ContinualConsumer
db *accounts.Database
asDB *storage.Database
query api.RoomserverQueryAPI
alias api.RoomserverAliasAPI
serverName string
workerStates []types.ApplicationServiceWorkerState
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call
// Start() to begin consuming from room servers.
func NewOutputRoomEventConsumer(
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
store *accounts.Database,
appserviceDB *storage.Database,
queryAPI api.RoomserverQueryAPI,
aliasAPI api.RoomserverAliasAPI,
workerStates []types.ApplicationServiceWorkerState,
) *OutputRoomEventConsumer {
appServices = cfg.Derived.ApplicationServices
consumer := common.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
Consumer: kafkaConsumer,
@ -60,9 +60,11 @@ func NewOutputRoomEventConsumer(
s := &OutputRoomEventConsumer{
roomServerConsumer: &consumer,
db: store,
asDB: appserviceDB,
query: queryAPI,
alias: aliasAPI,
serverName: string(cfg.Matrix.ServerName),
workerStates: workerStates,
}
consumer.ProcessMessage = s.onMessage
@ -74,9 +76,8 @@ func (s *OutputRoomEventConsumer) Start() error {
return s.roomServerConsumer.Start()
}
// onMessage is called when the sync server receives a new event from the room server output log.
// It is not safe for this function to be called from multiple goroutines, or else the
// sync stream position may race and be incorrectly calculated.
// onMessage is called when the appservice component receives a new event from
// the room server output log.
func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// Parse out the event JSON
var output api.OutputEvent
@ -98,50 +99,37 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
"event_id": ev.EventID(),
"room_id": ev.RoomID(),
"type": ev.Type(),
}).Info("appservice received event from roomserver")
}).Info("appservice received an event from roomserver")
events, err := s.lookupStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev)
missingEvents, err := s.lookupMissingStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev)
if err != nil {
return err
}
events := append(missingEvents, ev)
// Create a context to thread through the whole filtering process
ctx := context.TODO()
if err = s.db.UpdateMemberships(ctx, events, output.NewRoomEvent.RemovesStateEventIDs); err != nil {
return err
}
// Check if any events need to passed on to external application services
return s.filterRoomserverEvents(ctx, append(events, ev))
// Send event to any relevant application services
return s.filterRoomserverEvents(context.TODO(), events)
}
// lookupStateEvents looks up the state events that are added by a new event.
func (s *OutputRoomEventConsumer) lookupStateEvents(
// lookupMissingStateEvents looks up the state events that are added by a new event,
// and returns any not already present.
func (s *OutputRoomEventConsumer) lookupMissingStateEvents(
addsStateEventIDs []string, event gomatrixserverlib.Event,
) ([]gomatrixserverlib.Event, error) {
// Fast path if there aren't any new state events.
if len(addsStateEventIDs) == 0 {
// If the event is a membership update (e.g. for a profile update), it won't
// show up in AddsStateEventIDs, so we need to add it manually
if event.Type() == "m.room.member" {
return []gomatrixserverlib.Event{event}, nil
}
return nil, nil
return []gomatrixserverlib.Event{}, nil
}
// Fast path if the only state event added is the event itself.
if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() {
return []gomatrixserverlib.Event{event}, nil
return []gomatrixserverlib.Event{}, nil
}
result := []gomatrixserverlib.Event{}
missing := []string{}
for _, id := range addsStateEventIDs {
// Append the current event in the results if its ID is in the events list
if id == event.EventID() {
result = append(result, event)
} else {
if id != event.EventID() {
// If the event isn't the current one, add it to the list of events
// to retrieve from the roomserver
missing = append(missing, id)
@ -165,13 +153,22 @@ func (s *OutputRoomEventConsumer) lookupStateEvents(
// each namespace of each registered application service, and if there is a
// match, adds the event to the queue for events to be sent to a particular
// application service.
func (s *OutputRoomEventConsumer) filterRoomserverEvents(ctx context.Context, events []gomatrixserverlib.Event) error {
for _, event := range events {
for _, appservice := range appServices {
func (s *OutputRoomEventConsumer) filterRoomserverEvents(
ctx context.Context,
events []gomatrixserverlib.Event,
) error {
for _, ws := range s.workerStates {
for _, event := range events {
// Check if this event is interesting to this application service
if s.appserviceIsInterestedInEvent(ctx, event, appservice) {
// TODO: Queue this event to be sent off to the application service
fmt.Println(appservice.ID, "was interested in", event.Sender(), event.Type(), event.RoomID())
if s.appserviceIsInterestedInEvent(ctx, event, ws.AppService) {
// Queue this event to be sent off to the application service
if err := s.asDB.StoreEvent(ctx, ws.AppService.ID, &event); err != nil {
log.WithError(err).Warn("failed to insert incoming event into appservices database")
} else {
// Tell our worker to send out new messages by updating remaining message
// count and waking them up with a broadcast
ws.NotifyNewEvents()
}
}
}
}
@ -188,18 +185,9 @@ func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Cont
return false
}
// Check sender of the event
for _, userNamespace := range appservice.NamespaceMap["users"] {
if userNamespace.RegexpObject.MatchString(event.Sender()) {
return true
}
}
// Check room id of the event
for _, roomNamespace := range appservice.NamespaceMap["rooms"] {
if roomNamespace.RegexpObject.MatchString(event.RoomID()) {
return true
}
if appservice.IsInterestedInUserID(event.Sender()) ||
appservice.IsInterestedInRoomID(event.RoomID()) {
return true
}
// Check all known room aliases of the room the event came from
@ -207,10 +195,8 @@ func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Cont
var queryRes api.GetAliasesForRoomIDResponse
if err := s.alias.GetAliasesForRoomID(ctx, &queryReq, &queryRes); err == nil {
for _, alias := range queryRes.Aliases {
for _, aliasNamespace := range appservice.NamespaceMap["aliases"] {
if aliasNamespace.RegexpObject.MatchString(alias) {
return true
}
if appservice.IsInterestedInRoomAlias(alias) {
return true
}
}
} else {

View file

@ -0,0 +1,34 @@
// Copyright 2018 New Vector Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package query handles requests from other internal dendrite components when
// they interact with the AppServiceQueryAPI.
package query
import (
"net/http"
"github.com/matrix-org/dendrite/common/config"
)
// AppServiceQueryAPI is an implementation of api.AppServiceQueryAPI
type AppServiceQueryAPI struct {
HTTPClient *http.Client
Cfg *config.Dendrite
}
// SetupHTTP adds the AppServiceQueryPAI handlers to the http.ServeMux. This
// handles and muxes incoming api requests the to internal AppServiceQueryAPI.
func (a *AppServiceQueryAPI) SetupHTTP(servMux *http.ServeMux) {
}

View file

@ -0,0 +1,248 @@
// Copyright 2018 New Vector Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"context"
"database/sql"
"encoding/json"
"time"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
)
const appserviceEventsSchema = `
-- Stores events to be sent to application services
CREATE TABLE IF NOT EXISTS appservice_events (
-- An auto-incrementing id unique to each event in the table
id BIGSERIAL NOT NULL PRIMARY KEY,
-- The ID of the application service the event will be sent to
as_id TEXT NOT NULL,
-- JSON representation of the event
event_json TEXT NOT NULL,
-- The ID of the transaction that this event is a part of
txn_id BIGINT NOT NULL
);
CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id);
`
const selectEventsByApplicationServiceIDSQL = "" +
"SELECT id, event_json, txn_id " +
"FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC"
const countEventsByApplicationServiceIDSQL = "" +
"SELECT COUNT(id) FROM appservice_events WHERE as_id = $1"
const insertEventSQL = "" +
"INSERT INTO appservice_events(as_id, event_json, txn_id) " +
"VALUES ($1, $2, $3)"
const updateTxnIDForEventsSQL = "" +
"UPDATE appservice_events SET txn_id = $1 WHERE as_id = $2 AND id <= $3"
const deleteEventsBeforeAndIncludingIDSQL = "" +
"DELETE FROM appservice_events WHERE as_id = $1 AND id <= $2"
const (
// A transaction ID number that no transaction should ever have. Used for
// checking again the default value.
invalidTxnID = -2
)
type eventsStatements struct {
selectEventsByApplicationServiceIDStmt *sql.Stmt
countEventsByApplicationServiceIDStmt *sql.Stmt
insertEventStmt *sql.Stmt
updateTxnIDForEventsStmt *sql.Stmt
deleteEventsBeforeAndIncludingIDStmt *sql.Stmt
}
func (s *eventsStatements) prepare(db *sql.DB) (err error) {
_, err = db.Exec(appserviceEventsSchema)
if err != nil {
return
}
if s.selectEventsByApplicationServiceIDStmt, err = db.Prepare(selectEventsByApplicationServiceIDSQL); err != nil {
return
}
if s.countEventsByApplicationServiceIDStmt, err = db.Prepare(countEventsByApplicationServiceIDSQL); err != nil {
return
}
if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil {
return
}
if s.updateTxnIDForEventsStmt, err = db.Prepare(updateTxnIDForEventsSQL); err != nil {
return
}
if s.deleteEventsBeforeAndIncludingIDStmt, err = db.Prepare(deleteEventsBeforeAndIncludingIDSQL); err != nil {
return
}
return
}
// selectEventsByApplicationServiceID takes in an application service ID and
// returns a slice of events that need to be sent to that application service,
// as well as an int later used to remove these same events from the database
// once successfully sent to an application service.
func (s *eventsStatements) selectEventsByApplicationServiceID(
ctx context.Context,
applicationServiceID string,
limit int,
) (
txnID, maxID int,
events []gomatrixserverlib.Event,
eventsRemaining bool,
err error,
) {
// Retrieve events from the database. Unsuccessfully sent events first
eventRows, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID)
if err != nil {
return
}
defer func() {
err = eventRows.Close()
if err != nil {
log.WithFields(log.Fields{
"appservice": applicationServiceID,
}).WithError(err).Fatalf("appservice unable to select new events to send")
}
}()
events, maxID, txnID, eventsRemaining, err = retrieveEvents(eventRows, limit)
if err != nil {
return
}
return
}
func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.Event, maxID, txnID int, eventsRemaining bool, err error) {
// Get current time for use in calculating event age
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
// Iterate through each row and store event contents
// If txn_id changes dramatically, we've switched from collecting old events to
// new ones. Send back those events first.
lastTxnID := invalidTxnID
for eventsProcessed := 0; eventRows.Next(); {
var event gomatrixserverlib.Event
var eventJSON []byte
var id int
err = eventRows.Scan(
&id,
&eventJSON,
&txnID,
)
if err != nil {
return nil, 0, 0, false, err
}
// Unmarshal eventJSON
if err = json.Unmarshal(eventJSON, &event); err != nil {
return nil, 0, 0, false, err
}
// If txnID has changed on this event from the previous event, then we've
// reached the end of a transaction's events. Return only those events.
if lastTxnID > invalidTxnID && lastTxnID != txnID {
return events, maxID, lastTxnID, true, nil
}
lastTxnID = txnID
// Limit events that aren't part of an old transaction
if txnID == -1 {
// Return if we've hit the limit
if eventsProcessed++; eventsProcessed > limit {
return events, maxID, lastTxnID, true, nil
}
}
if id > maxID {
maxID = id
}
// Portion of the event that is unsigned due to rapid change
// TODO: Consider removing age as not many app services use it
if err = event.SetUnsignedField("age", nowMilli-int64(event.OriginServerTS())); err != nil {
return nil, 0, 0, false, err
}
events = append(events, event)
}
return
}
// countEventsByApplicationServiceID inserts an event mapped to its corresponding application service
// IDs into the db.
func (s *eventsStatements) countEventsByApplicationServiceID(
ctx context.Context,
appServiceID string,
) (int, error) {
var count int
err := s.countEventsByApplicationServiceIDStmt.QueryRowContext(ctx, appServiceID).Scan(&count)
if err != nil && err != sql.ErrNoRows {
return 0, err
}
return count, nil
}
// insertEvent inserts an event mapped to its corresponding application service
// IDs into the db.
func (s *eventsStatements) insertEvent(
ctx context.Context,
appServiceID string,
event *gomatrixserverlib.Event,
) (err error) {
// Convert event to JSON before inserting
eventJSON, err := json.Marshal(event)
if err != nil {
return err
}
_, err = s.insertEventStmt.ExecContext(
ctx,
appServiceID,
eventJSON,
-1, // No transaction ID yet
)
return
}
// updateTxnIDForEvents sets the transactionID for a collection of events. Done
// before sending them to an AppService. Referenced before sending to make sure
// we aren't constructing multiple transactions with the same events.
func (s *eventsStatements) updateTxnIDForEvents(
ctx context.Context,
appserviceID string,
maxID, txnID int,
) (err error) {
_, err = s.updateTxnIDForEventsStmt.ExecContext(ctx, txnID, appserviceID, maxID)
return
}
// deleteEventsBeforeAndIncludingID removes events matching given IDs from the database.
func (s *eventsStatements) deleteEventsBeforeAndIncludingID(
ctx context.Context,
appserviceID string,
eventTableID int,
) (err error) {
_, err = s.deleteEventsBeforeAndIncludingIDStmt.ExecContext(ctx, appserviceID, eventTableID)
return
}

View file

@ -0,0 +1,110 @@
// Copyright 2018 New Vector Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"context"
"database/sql"
// Import postgres database driver
_ "github.com/lib/pq"
"github.com/matrix-org/gomatrixserverlib"
)
// Database stores events intended to be later sent to application services
type Database struct {
events eventsStatements
txnID txnStatements
db *sql.DB
}
// NewDatabase opens a new database
func NewDatabase(dataSourceName string) (*Database, error) {
var result Database
var err error
if result.db, err = sql.Open("postgres", dataSourceName); err != nil {
return nil, err
}
if err = result.prepare(); err != nil {
return nil, err
}
return &result, nil
}
func (d *Database) prepare() error {
if err := d.events.prepare(d.db); err != nil {
return err
}
return d.txnID.prepare(d.db)
}
// StoreEvent takes in a gomatrixserverlib.Event and stores it in the database
// for a transaction worker to pull and later send to an application service.
func (d *Database) StoreEvent(
ctx context.Context,
appServiceID string,
event *gomatrixserverlib.Event,
) error {
return d.events.insertEvent(ctx, appServiceID, event)
}
// GetEventsWithAppServiceID returns a slice of events and their IDs intended to
// be sent to an application service given its ID.
func (d *Database) GetEventsWithAppServiceID(
ctx context.Context,
appServiceID string,
limit int,
) (int, int, []gomatrixserverlib.Event, bool, error) {
return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit)
}
// CountEventsWithAppServiceID returns the number of events destined for an
// application service given its ID.
func (d *Database) CountEventsWithAppServiceID(
ctx context.Context,
appServiceID string,
) (int, error) {
return d.events.countEventsByApplicationServiceID(ctx, appServiceID)
}
// UpdateTxnIDForEvents takes in an application service ID and a
// and stores them in the DB, unless the pair already exists, in
// which case it updates them.
func (d *Database) UpdateTxnIDForEvents(
ctx context.Context,
appserviceID string,
maxID, txnID int,
) error {
return d.events.updateTxnIDForEvents(ctx, appserviceID, maxID, txnID)
}
// RemoveEventsBeforeAndIncludingID removes all events from the database that
// are less than or equal to a given maximum ID. IDs here are implemented as a
// serial, thus this should always delete events in chronological order.
func (d *Database) RemoveEventsBeforeAndIncludingID(
ctx context.Context,
appserviceID string,
eventTableID int,
) error {
return d.events.deleteEventsBeforeAndIncludingID(ctx, appserviceID, eventTableID)
}
// GetLatestTxnID returns the latest available transaction id
func (d *Database) GetLatestTxnID(
ctx context.Context,
) (int, error) {
return d.txnID.selectTxnID(ctx)
}

View file

@ -0,0 +1,52 @@
// Copyright 2018 New Vector Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"context"
"database/sql"
)
const txnIDSchema = `
-- Keeps a count of the current transaction ID
CREATE SEQUENCE IF NOT EXISTS txn_id_counter START 1;
`
const selectTxnIDSQL = "SELECT nextval('txn_id_counter')"
type txnStatements struct {
selectTxnIDStmt *sql.Stmt
}
func (s *txnStatements) prepare(db *sql.DB) (err error) {
_, err = db.Exec(txnIDSchema)
if err != nil {
return
}
if s.selectTxnIDStmt, err = db.Prepare(selectTxnIDSQL); err != nil {
return
}
return
}
// selectTxnID selects the latest ascending transaction ID
func (s *txnStatements) selectTxnID(
ctx context.Context,
) (txnID int, err error) {
err = s.selectTxnIDStmt.QueryRowContext(ctx).Scan(&txnID)
return
}

View file

@ -12,7 +12,53 @@
package types
import (
"sync"
"github.com/matrix-org/dendrite/common/config"
)
const (
// AppServiceDeviceID is the AS dummy device ID
AppServiceDeviceID = "AS_Device"
)
// ApplicationServiceWorkerState is a type that couples an application service,
// a lockable condition as well as some other state variables, allowing the
// roomserver to notify appservice workers when there are events ready to send
// externally to application services.
type ApplicationServiceWorkerState struct {
AppService config.ApplicationService
Cond *sync.Cond
// Events ready to be sent
EventsReady bool
// Backoff exponent (2^x secs). Max 6, aka 64s.
Backoff int
}
// NotifyNewEvents wakes up all waiting goroutines, notifying that events remain
// in the event queue for this application service worker.
func (a *ApplicationServiceWorkerState) NotifyNewEvents() {
a.Cond.L.Lock()
a.EventsReady = true
a.Cond.Broadcast()
a.Cond.L.Unlock()
}
// FinishEventProcessing marks all events of this worker as being sent to the
// application service.
func (a *ApplicationServiceWorkerState) FinishEventProcessing() {
a.Cond.L.Lock()
a.EventsReady = false
a.Cond.L.Unlock()
}
// WaitForNewEvents causes the calling goroutine to wait on the worker state's
// condition for a broadcast or similar wakeup, if there are no events ready.
func (a *ApplicationServiceWorkerState) WaitForNewEvents() {
a.Cond.L.Lock()
if !a.EventsReady {
a.Cond.Wait()
}
a.Cond.L.Unlock()
}

View file

@ -0,0 +1,234 @@
// Copyright 2018 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package workers
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"math"
"net/http"
"time"
"github.com/matrix-org/dendrite/appservice/storage"
"github.com/matrix-org/dendrite/appservice/types"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
)
var (
// Maximum size of events sent in each transaction.
transactionBatchSize = 50
// Timeout for sending a single transaction to an application service.
transactionTimeout = time.Second * 60
)
// SetupTransactionWorkers spawns a separate goroutine for each application
// service. Each of these "workers" handle taking all events intended for their
// app service, batch them up into a single transaction (up to a max transaction
// size), then send that off to the AS's /transactions/{txnID} endpoint. It also
// handles exponentially backing off in case the AS isn't currently available.
func SetupTransactionWorkers(
appserviceDB *storage.Database,
workerStates []types.ApplicationServiceWorkerState,
) error {
// Create a worker that handles transmitting events to a single homeserver
for _, workerState := range workerStates {
// Don't create a worker if this AS doesn't want to receive events
if workerState.AppService.URL != "" {
go worker(appserviceDB, workerState)
}
}
return nil
}
// worker is a goroutine that sends any queued events to the application service
// it is given.
func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
log.WithFields(log.Fields{
"appservice": ws.AppService.ID,
}).Info("starting application service")
ctx := context.Background()
// Create a HTTP client for sending requests to app services
client := &http.Client{
Timeout: transactionTimeout,
// TODO: Verify certificates
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true, // nolint: gas
},
},
}
// Initial check for any leftover events to send from last time
eventCount, err := db.CountEventsWithAppServiceID(ctx, ws.AppService.ID)
if err != nil {
log.WithFields(log.Fields{
"appservice": ws.AppService.ID,
}).WithError(err).Fatal("appservice worker unable to read queued events from DB")
return
}
if eventCount > 0 {
ws.NotifyNewEvents()
}
// Loop forever and keep waiting for more events to send
for {
// Wait for more events if we've sent all the events in the database
ws.WaitForNewEvents()
// Batch events up into a transaction
transactionJSON, txnID, maxEventID, eventsRemaining, err := createTransaction(ctx, db, ws.AppService.ID)
if err != nil {
log.WithFields(log.Fields{
"appservice": ws.AppService.ID,
}).WithError(err).Fatal("appservice worker unable to create transaction")
return
}
// Send the events off to the application service
// Backoff if the application service does not respond
err = send(client, ws.AppService, txnID, transactionJSON)
if err != nil {
// Backoff
backoff(&ws, err)
continue
}
// We sent successfully, hooray!
ws.Backoff = 0
// Transactions have a maximum event size, so there may still be some events
// left over to send. Keep sending until none are left
if !eventsRemaining {
ws.FinishEventProcessing()
}
// Remove sent events from the DB
err = db.RemoveEventsBeforeAndIncludingID(ctx, ws.AppService.ID, maxEventID)
if err != nil {
log.WithFields(log.Fields{
"appservice": ws.AppService.ID,
}).WithError(err).Fatal("unable to remove appservice events from the database")
return
}
}
}
// backoff pauses the calling goroutine for a 2^some backoff exponent seconds
func backoff(ws *types.ApplicationServiceWorkerState, err error) {
// Calculate how long to backoff for
backoffDuration := time.Duration(math.Pow(2, float64(ws.Backoff)))
backoffSeconds := time.Second * backoffDuration
log.WithFields(log.Fields{
"appservice": ws.AppService.ID,
}).WithError(err).Warnf("unable to send transactions successfully, backing off for %ds",
backoffDuration)
ws.Backoff++
if ws.Backoff > 6 {
ws.Backoff = 6
}
// Backoff
time.Sleep(backoffSeconds)
}
// createTransaction takes in a slice of AS events, stores them in an AS
// transaction, and JSON-encodes the results.
func createTransaction(
ctx context.Context,
db *storage.Database,
appserviceID string,
) (
transactionJSON []byte,
txnID, maxID int,
eventsRemaining bool,
err error,
) {
// Retrieve the latest events from the DB (will return old events if they weren't successfully sent)
txnID, maxID, events, eventsRemaining, err := db.GetEventsWithAppServiceID(ctx, appserviceID, transactionBatchSize)
if err != nil {
log.WithFields(log.Fields{
"appservice": appserviceID,
}).WithError(err).Fatalf("appservice worker unable to read queued events from DB")
return
}
// Check if these events do not already have a transaction ID
if txnID == -1 {
// If not, grab next available ID from the DB
txnID, err = db.GetLatestTxnID(ctx)
if err != nil {
return nil, 0, 0, false, err
}
// Mark new events with current transactionID
if err = db.UpdateTxnIDForEvents(ctx, appserviceID, maxID, txnID); err != nil {
return nil, 0, 0, false, err
}
}
// Create a transaction and store the events inside
transaction := gomatrixserverlib.ApplicationServiceTransaction{
Events: events,
}
transactionJSON, err = json.Marshal(transaction)
if err != nil {
return
}
return
}
// send sends events to an application service. Returns an error if an OK was not
// received back from the application service or the request timed out.
func send(
client *http.Client,
appservice config.ApplicationService,
txnID int,
transaction []byte,
) error {
// POST a transaction to our AS
address := fmt.Sprintf("%s/transactions/%d", appservice.URL, txnID)
resp, err := client.Post(address, "application/json", bytes.NewBuffer(transaction))
if err != nil {
return err
}
defer func() {
err := resp.Body.Close()
if err != nil {
log.WithFields(log.Fields{
"appservice": appservice.ID,
}).WithError(err).Error("unable to close response body from application service")
}
}()
// Check the AS received the events correctly
if resp.StatusCode != http.StatusOK {
// TODO: Handle non-200 error codes from application services
return fmt.Errorf("non-OK status code %d returned from AS", resp.StatusCode)
}
return nil
}

View file

@ -24,6 +24,7 @@ import (
"net/http"
"strings"
"github.com/matrix-org/dendrite/appservice/types"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
@ -48,26 +49,32 @@ type AccountDatabase interface {
GetAccountByLocalpart(ctx context.Context, localpart string) (*authtypes.Account, error)
}
// Data contains information required to authenticate a request.
type Data struct {
AccountDB AccountDatabase
DeviceDB DeviceDatabase
// AppServices is the list of all registered AS
AppServices []config.ApplicationService
}
// VerifyUserFromRequest authenticates the HTTP request,
// on success returns UserID of the requester.
// on success returns Device of the requester.
// Finds local user or an application service user.
// Note: For an AS user, AS dummy device is returned.
// On failure returns an JSON error response which can be sent to the client.
func VerifyUserFromRequest(
req *http.Request, accountDB AccountDatabase, deviceDB DeviceDatabase,
applicationServices []config.ApplicationService,
) (string, *util.JSONResponse) {
req *http.Request, data Data,
) (*authtypes.Device, *util.JSONResponse) {
// Try to find local user from device database
dev, devErr := VerifyAccessToken(req, deviceDB)
dev, devErr := verifyAccessToken(req, data.DeviceDB)
if devErr == nil {
return dev.UserID, nil
return dev, verifyUserParameters(req)
}
// Try to find the Application Service user
token, err := extractAccessToken(req)
token, err := ExtractAccessToken(req)
if err != nil {
return "", &util.JSONResponse{
return nil, &util.JSONResponse{
Code: http.StatusUnauthorized,
JSON: jsonerror.MissingToken(err.Error()),
}
@ -75,7 +82,7 @@ func VerifyUserFromRequest(
// Search for app service with given access_token
var appService *config.ApplicationService
for _, as := range applicationServices {
for _, as := range data.AppServices {
if as.ASToken == token {
appService = &as
break
@ -83,41 +90,67 @@ func VerifyUserFromRequest(
}
if appService != nil {
// Create a dummy device for AS user
dev := authtypes.Device{
// Use AS dummy device ID
ID: types.AppServiceDeviceID,
// AS dummy device has AS's token.
AccessToken: token,
}
userID := req.URL.Query().Get("user_id")
localpart, err := userutil.ParseUsernameParam(userID, nil)
if err != nil {
return "", &util.JSONResponse{
return nil, &util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.InvalidUsername(err.Error()),
}
}
// Verify that the user is registered
account, accountErr := accountDB.GetAccountByLocalpart(req.Context(), localpart)
if localpart != "" { // AS is masquerading as another user
// Verify that the user is registered
account, err := data.AccountDB.GetAccountByLocalpart(req.Context(), localpart)
// Verify that account exists & appServiceID matches
if err == nil && account.AppServiceID == appService.ID {
// Set the userID of dummy device
dev.UserID = userID
return &dev, nil
}
// Verify that account exists & appServiceID matches
if accountErr == nil && account.AppServiceID == appService.ID {
return userID, nil
return nil, &util.JSONResponse{
Code: http.StatusForbidden,
JSON: jsonerror.Forbidden("Application service has not registered this user"),
}
}
return "", &util.JSONResponse{
Code: http.StatusForbidden,
JSON: jsonerror.Forbidden("Application service has not registered this user"),
}
// AS is not masquerading as any user, so use AS's sender_localpart
dev.UserID = appService.SenderLocalpart
return &dev, nil
}
return "", &util.JSONResponse{
return nil, &util.JSONResponse{
Code: http.StatusUnauthorized,
JSON: jsonerror.UnknownToken("Unrecognized access token"),
}
}
// VerifyAccessToken verifies that an access token was supplied in the given HTTP request
// verifyUserParameters ensures that a request coming from a regular user is not
// using any query parameters reserved for an application service
func verifyUserParameters(req *http.Request) *util.JSONResponse {
if req.URL.Query().Get("ts") != "" {
return &util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.Unknown("parameter 'ts' not allowed without valid parameter 'access_token'"),
}
}
return nil
}
// verifyAccessToken verifies that an access token was supplied in the given HTTP request
// and returns the device it corresponds to. Returns resErr (an error response which can be
// sent to the client) if the token is invalid or there was a problem querying the database.
func VerifyAccessToken(req *http.Request, deviceDB DeviceDatabase) (device *authtypes.Device, resErr *util.JSONResponse) {
token, err := extractAccessToken(req)
func verifyAccessToken(req *http.Request, deviceDB DeviceDatabase) (device *authtypes.Device, resErr *util.JSONResponse) {
token, err := ExtractAccessToken(req)
if err != nil {
resErr = &util.JSONResponse{
Code: http.StatusUnauthorized,
@ -151,9 +184,9 @@ func GenerateAccessToken() (string, error) {
return base64.RawURLEncoding.EncodeToString(b), nil
}
// extractAccessToken from a request, or return an error detailing what went wrong. The
// ExtractAccessToken from a request, or return an error detailing what went wrong. The
// error message MUST be human-readable and comprehensible to the client.
func extractAccessToken(req *http.Request) (string, error) {
func ExtractAccessToken(req *http.Request) (string, error) {
// cf https://github.com/matrix-org/synapse/blob/v0.19.2/synapse/api/auth.py#L631
authBearer := req.Header.Get("Authorization")
queryToken := req.URL.Query().Get("access_token")

View file

@ -35,7 +35,7 @@ CREATE TABLE IF NOT EXISTS account_accounts (
created_ts BIGINT NOT NULL,
-- The password hash for this account. Can be NULL if this is a passwordless account.
password_hash TEXT,
-- Identifies which Application Service this account belongs to, if any.
-- Identifies which application service this account belongs to, if any.
appservice_id TEXT
-- TODO:
-- is_guest, is_admin, upgraded_ts, devices, any email reset stuff?

View file

@ -48,13 +48,17 @@ const insertMembershipSQL = `
const selectMembershipsByLocalpartSQL = "" +
"SELECT room_id, event_id FROM account_memberships WHERE localpart = $1"
const selectMembershipInRoomByLocalpartSQL = "" +
"SELECT event_id FROM account_memberships WHERE localpart = $1 AND room_id = $2"
const deleteMembershipsByEventIDsSQL = "" +
"DELETE FROM account_memberships WHERE event_id = ANY($1)"
type membershipStatements struct {
deleteMembershipsByEventIDsStmt *sql.Stmt
insertMembershipStmt *sql.Stmt
selectMembershipsByLocalpartStmt *sql.Stmt
deleteMembershipsByEventIDsStmt *sql.Stmt
insertMembershipStmt *sql.Stmt
selectMembershipInRoomByLocalpartStmt *sql.Stmt
selectMembershipsByLocalpartStmt *sql.Stmt
}
func (s *membershipStatements) prepare(db *sql.DB) (err error) {
@ -68,6 +72,9 @@ func (s *membershipStatements) prepare(db *sql.DB) (err error) {
if s.insertMembershipStmt, err = db.Prepare(insertMembershipSQL); err != nil {
return
}
if s.selectMembershipInRoomByLocalpartStmt, err = db.Prepare(selectMembershipInRoomByLocalpartSQL); err != nil {
return
}
if s.selectMembershipsByLocalpartStmt, err = db.Prepare(selectMembershipsByLocalpartSQL); err != nil {
return
}
@ -90,6 +97,16 @@ func (s *membershipStatements) deleteMembershipsByEventIDs(
return
}
func (s *membershipStatements) selectMembershipInRoomByLocalpart(
ctx context.Context, localpart, roomID string,
) (authtypes.Membership, error) {
membership := authtypes.Membership{Localpart: localpart, RoomID: roomID}
stmt := s.selectMembershipInRoomByLocalpartStmt
err := stmt.QueryRowContext(ctx, localpart, roomID).Scan(&membership.EventID)
return membership, err
}
func (s *membershipStatements) selectMembershipsByLocalpart(
ctx context.Context, localpart string,
) (memberships []authtypes.Membership, err error) {

View file

@ -185,6 +185,16 @@ func (d *Database) UpdateMemberships(
})
}
// GetMembershipInRoomByLocalpart returns the membership for an user
// matching the given localpart if he is a member of the room matching roomID,
// if not sql.ErrNoRows is returned.
// If there was an issue during the retrieval, returns the SQL error
func (d *Database) GetMembershipInRoomByLocalpart(
ctx context.Context, localpart, roomID string,
) (authtypes.Membership, error) {
return d.memberships.selectMembershipInRoomByLocalpart(ctx, localpart, roomID)
}
// GetMembershipsByLocalpart returns an array containing the memberships for all
// the rooms a user matching a given localpart is a member of
// If no membership match the given localpart, returns an empty array

View file

@ -138,9 +138,9 @@ func (d *Database) UpdateDevice(
}
// RemoveDevice revokes a device by deleting the entry in the database
// matching with the given device ID and user ID localpart
// matching with the given device ID and user ID localpart.
// If the device doesn't exist, it will not return an error
// If something went wrong during the deletion, it will return the SQL error
// If something went wrong during the deletion, it will return the SQL error.
func (d *Database) RemoveDevice(
ctx context.Context, deviceID, localpart string,
) error {

View file

@ -22,7 +22,8 @@ import (
"github.com/matrix-org/dendrite/clientapi/routing"
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/common/transactions"
"github.com/matrix-org/dendrite/roomserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
typingServerAPI "github.com/matrix-org/dendrite/typingserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
)
@ -35,12 +36,14 @@ func SetupClientAPIComponent(
accountsDB *accounts.Database,
federation *gomatrixserverlib.FederationClient,
keyRing *gomatrixserverlib.KeyRing,
aliasAPI api.RoomserverAliasAPI,
inputAPI api.RoomserverInputAPI,
queryAPI api.RoomserverQueryAPI,
aliasAPI roomserverAPI.RoomserverAliasAPI,
inputAPI roomserverAPI.RoomserverInputAPI,
queryAPI roomserverAPI.RoomserverQueryAPI,
typingInputAPI typingServerAPI.TypingServerInputAPI,
transactionsCache *transactions.Cache,
) {
roomserverProducer := producers.NewRoomserverProducer(inputAPI)
typingProducer := producers.NewTypingServerProducer(typingInputAPI)
userUpdateProducer := &producers.UserUpdateProducer{
Producer: base.KafkaProducer,
@ -60,10 +63,8 @@ func SetupClientAPIComponent(
}
routing.Setup(
base.APIMux, *base.Cfg, roomserverProducer,
queryAPI, aliasAPI, accountsDB, deviceDB,
federation, *keyRing,
userUpdateProducer, syncProducer,
transactionsCache,
base.APIMux, *base.Cfg, roomserverProducer, queryAPI, aliasAPI,
accountsDB, deviceDB, federation, *keyRing, userUpdateProducer,
syncProducer, typingProducer, transactionsCache,
)
}

View file

@ -25,7 +25,6 @@ import (
// UnmarshalJSONRequest into the given interface pointer. Returns an error JSON response if
// there was a problem unmarshalling. Calling this function consumes the request body.
func UnmarshalJSONRequest(req *http.Request, iface interface{}) *util.JSONResponse {
defer req.Body.Close() // nolint: errcheck
if err := json.NewDecoder(req.Body).Decode(iface); err != nil {
// TODO: We may want to suppress the Error() return in production? It's useful when
// debugging because an error will be produced for both invalid/malformed JSON AND

View file

@ -0,0 +1,54 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package producers
import (
"context"
"time"
"github.com/matrix-org/dendrite/typingserver/api"
"github.com/matrix-org/gomatrixserverlib"
)
// TypingServerProducer produces events for the typing server to consume
type TypingServerProducer struct {
InputAPI api.TypingServerInputAPI
}
// NewTypingServerProducer creates a new TypingServerProducer
func NewTypingServerProducer(inputAPI api.TypingServerInputAPI) *TypingServerProducer {
return &TypingServerProducer{
InputAPI: inputAPI,
}
}
// Send typing event to typing server
func (p *TypingServerProducer) Send(
ctx context.Context, userID, roomID string,
typing bool, timeout int64,
) error {
requestData := api.InputTypingEvent{
UserID: userID,
RoomID: roomID,
Typing: typing,
Timeout: timeout,
OriginServerTS: gomatrixserverlib.AsTimestamp(time.Now()),
}
var response api.InputTypingEventResponse
err := p.InputAPI.InputTypingEvent(
ctx, &api.InputTypingEventRequest{InputTypingEvent: requestData}, &response,
)
return err
}

View file

@ -18,7 +18,6 @@ import (
"fmt"
"net/http"
"strings"
"time"
"github.com/matrix-org/dendrite/roomserver/api"
@ -88,8 +87,7 @@ func (r createRoomRequest) Validate() *util.JSONResponse {
}
}
switch r.Preset {
case presetPrivateChat, presetTrustedPrivateChat, presetPublicChat:
break
case presetPrivateChat, presetTrustedPrivateChat, presetPublicChat, "":
default:
return &util.JSONResponse{
Code: http.StatusBadRequest,
@ -114,7 +112,8 @@ type fledglingEvent struct {
}
// CreateRoom implements /createRoom
func CreateRoom(req *http.Request, device *authtypes.Device,
func CreateRoom(
req *http.Request, device *authtypes.Device,
cfg config.Dendrite, producer *producers.RoomserverProducer,
accountDB *accounts.Database, aliasAPI api.RoomserverAliasAPI,
) util.JSONResponse {
@ -126,7 +125,8 @@ func CreateRoom(req *http.Request, device *authtypes.Device,
// createRoom implements /createRoom
// nolint: gocyclo
func createRoom(req *http.Request, device *authtypes.Device,
func createRoom(
req *http.Request, device *authtypes.Device,
cfg config.Dendrite, roomID string, producer *producers.RoomserverProducer,
accountDB *accounts.Database, aliasAPI api.RoomserverAliasAPI,
) util.JSONResponse {
@ -181,6 +181,11 @@ func createRoom(req *http.Request, device *authtypes.Device,
case presetPublicChat:
joinRules = joinRulePublic
historyVisibility = historyVisibilityShared
default:
// Default room rules, r.Preset was previously checked for valid values so
// only a request with no preset should end up here.
joinRules = joinRuleInvite
historyVisibility = historyVisibilityShared
}
var builtEvents []gomatrixserverlib.Event
@ -244,7 +249,7 @@ func createRoom(req *http.Request, device *authtypes.Device,
builder.PrevEvents = []gomatrixserverlib.EventReference{builtEvents[i-1].EventReference()}
}
var ev *gomatrixserverlib.Event
ev, err = buildEvent(&builder, &authEvents, cfg)
ev, err = buildEvent(req, &builder, &authEvents, cfg)
if err != nil {
return httputil.LogThenError(req, err)
}
@ -303,9 +308,12 @@ func createRoom(req *http.Request, device *authtypes.Device,
}
// buildEvent fills out auth_events for the builder then builds the event
func buildEvent(builder *gomatrixserverlib.EventBuilder,
func buildEvent(
req *http.Request,
builder *gomatrixserverlib.EventBuilder,
provider gomatrixserverlib.AuthEventProvider,
cfg config.Dendrite) (*gomatrixserverlib.Event, error) {
cfg config.Dendrite,
) (*gomatrixserverlib.Event, error) {
eventsNeeded, err := gomatrixserverlib.StateNeededForEventBuilder(builder)
if err != nil {
@ -317,8 +325,8 @@ func buildEvent(builder *gomatrixserverlib.EventBuilder,
}
builder.AuthEvents = refs
eventID := fmt.Sprintf("$%s:%s", util.RandomString(16), cfg.Matrix.ServerName)
now := time.Now()
event, err := builder.Build(eventID, now, cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey)
eventTime := common.ParseTSParam(req)
event, err := builder.Build(eventID, eventTime, cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey)
if err != nil {
return nil, fmt.Errorf("cannot build event %s : Builder failed to build. %s", builder.Type, err)
}

View file

@ -115,9 +115,12 @@ func SetLocalAlias(
// Check that the alias does not fall within an exclusive namespace of an
// application service
// TODO: This code should eventually be refactored with:
// 1. The new method for checking for things matching an AS's namespace
// 2. Using an overall Regex object for all AS's just like we did for usernames
for _, appservice := range cfg.Derived.ApplicationServices {
if userNamespaces, ok := appservice.NamespaceMap["users"]; ok {
for _, namespace := range userNamespaces {
if aliasNamespaces, ok := appservice.NamespaceMap["aliases"]; ok {
for _, namespace := range aliasNamespaces {
if namespace.Exclusive && namespace.RegexpObject.MatchString(alias) {
return util.JSONResponse{
Code: http.StatusBadRequest,

View file

@ -18,7 +18,6 @@ import (
"fmt"
"net/http"
"strings"
"time"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
@ -27,7 +26,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/roomserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
@ -42,8 +41,8 @@ func JoinRoomByIDOrAlias(
cfg config.Dendrite,
federation *gomatrixserverlib.FederationClient,
producer *producers.RoomserverProducer,
queryAPI api.RoomserverQueryAPI,
aliasAPI api.RoomserverAliasAPI,
queryAPI roomserverAPI.RoomserverQueryAPI,
aliasAPI roomserverAPI.RoomserverAliasAPI,
keyRing gomatrixserverlib.KeyRing,
accountDB *accounts.Database,
) util.JSONResponse {
@ -87,8 +86,8 @@ type joinRoomReq struct {
cfg config.Dendrite
federation *gomatrixserverlib.FederationClient
producer *producers.RoomserverProducer
queryAPI api.RoomserverQueryAPI
aliasAPI api.RoomserverAliasAPI
queryAPI roomserverAPI.RoomserverQueryAPI
aliasAPI roomserverAPI.RoomserverAliasAPI
keyRing gomatrixserverlib.KeyRing
}
@ -100,10 +99,10 @@ func (r joinRoomReq) joinRoomByID(roomID string) util.JSONResponse {
// If the server is not in the room the we will need to look up the
// remote server the invite came from in order to request a join event
// from that server.
queryReq := api.QueryInvitesForUserRequest{
queryReq := roomserverAPI.QueryInvitesForUserRequest{
RoomID: roomID, TargetUserID: r.userID,
}
var queryRes api.QueryInvitesForUserResponse
var queryRes roomserverAPI.QueryInvitesForUserResponse
if err := r.queryAPI.QueryInvitesForUser(r.req.Context(), &queryReq, &queryRes); err != nil {
return httputil.LogThenError(r.req, err)
}
@ -145,8 +144,8 @@ func (r joinRoomReq) joinRoomByAlias(roomAlias string) util.JSONResponse {
}
}
if domain == r.cfg.Matrix.ServerName {
queryReq := api.GetRoomIDForAliasRequest{Alias: roomAlias}
var queryRes api.GetRoomIDForAliasResponse
queryReq := roomserverAPI.GetRoomIDForAliasRequest{Alias: roomAlias}
var queryRes roomserverAPI.GetRoomIDForAliasResponse
if err = r.aliasAPI.GetRoomIDForAlias(r.req.Context(), &queryReq, &queryRes); err != nil {
return httputil.LogThenError(r.req, err)
}
@ -214,8 +213,8 @@ func (r joinRoomReq) joinRoomUsingServers(
return httputil.LogThenError(r.req, err)
}
var queryRes api.QueryLatestEventsAndStateResponse
event, err := common.BuildEvent(r.req.Context(), &eb, r.cfg, r.queryAPI, &queryRes)
var queryRes roomserverAPI.QueryLatestEventsAndStateResponse
event, err := common.BuildEvent(r.req, &eb, r.cfg, r.queryAPI, &queryRes)
if err == nil {
if _, err = r.producer.SendEvents(r.req.Context(), []gomatrixserverlib.Event{*event}, r.cfg.Matrix.ServerName, nil); err != nil {
return httputil.LogThenError(r.req, err)
@ -285,10 +284,10 @@ func (r joinRoomReq) joinRoomUsingServer(roomID string, server gomatrixserverlib
return nil, err
}
now := time.Now()
eventID := fmt.Sprintf("$%s:%s", util.RandomString(16), r.cfg.Matrix.ServerName)
eventTime := common.ParseTSParam(r.req)
event, err := respMakeJoin.JoinEvent.Build(
eventID, now, r.cfg.Matrix.ServerName, r.cfg.Matrix.KeyID, r.cfg.Matrix.PrivateKey,
eventID, eventTime, r.cfg.Matrix.ServerName, r.cfg.Matrix.KeyID, r.cfg.Matrix.PrivateKey,
)
if err != nil {
res := httputil.LogThenError(r.req, err)

View file

@ -48,8 +48,7 @@ func SendMembership(
}
inviteStored, err := threepid.CheckAndProcessInvite(
req.Context(),
device, &body, cfg, queryAPI, accountDB, producer, membership, roomID,
req, device, &body, cfg, queryAPI, accountDB, producer, membership, roomID,
)
if err == threepid.ErrMissingParameter {
return util.JSONResponse{
@ -81,7 +80,7 @@ func SendMembership(
}
event, err := buildMembershipEvent(
req.Context(), body, accountDB, device, membership, roomID, cfg, queryAPI,
req, body, accountDB, device, membership, roomID, cfg, queryAPI,
)
if err == errMissingUserID {
return util.JSONResponse{
@ -110,7 +109,7 @@ func SendMembership(
}
func buildMembershipEvent(
ctx context.Context,
req *http.Request,
body threepid.MembershipRequest, accountDB *accounts.Database,
device *authtypes.Device, membership string, roomID string, cfg config.Dendrite,
queryAPI api.RoomserverQueryAPI,
@ -120,7 +119,7 @@ func buildMembershipEvent(
return nil, err
}
profile, err := loadProfile(ctx, stateKey, cfg, accountDB)
profile, err := loadProfile(req.Context(), stateKey, cfg, accountDB)
if err != nil {
return nil, err
}
@ -148,7 +147,7 @@ func buildMembershipEvent(
return nil, err
}
return common.BuildEvent(ctx, &builder, cfg, queryAPI, nil)
return common.BuildEvent(req, &builder, cfg, queryAPI, nil)
}
// loadProfile lookups the profile of a given user from the database and returns

View file

@ -15,7 +15,7 @@
package routing
import (
"context"
"database/sql"
"net/http"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
@ -41,15 +41,12 @@ func GetProfile(
JSON: jsonerror.NotFound("Bad method"),
}
}
localpart, _, err := gomatrixserverlib.SplitID('@', userID)
profile, err := getProfileByUserID(req, accountDB, userID)
if err != nil {
return httputil.LogThenError(req, err)
return *err
}
profile, err := accountDB.GetProfileByLocalpart(req.Context(), localpart)
if err != nil {
return httputil.LogThenError(req, err)
}
res := common.ProfileResponse{
AvatarURL: profile.AvatarURL,
DisplayName: profile.DisplayName,
@ -60,19 +57,39 @@ func GetProfile(
}
}
// getProfileByUserID returns the profile for userID, otherwise returns an error response
func getProfileByUserID(
req *http.Request, accountDB *accounts.Database, userID string,
) (*authtypes.Profile, *util.JSONResponse) {
localpart, _, err := gomatrixserverlib.SplitID('@', userID)
if err != nil {
resErr := httputil.LogThenError(req, err)
return nil, &resErr
}
profile, err := accountDB.GetProfileByLocalpart(req.Context(), localpart)
if err == sql.ErrNoRows {
return nil, &util.JSONResponse{
Code: http.StatusNotFound,
JSON: jsonerror.NotFound("no profile information for this user or this user does not exist"),
}
} else if err != nil {
resErr := httputil.LogThenError(req, err)
return nil, &resErr
}
return profile, nil
}
// GetAvatarURL implements GET /profile/{userID}/avatar_url
func GetAvatarURL(
req *http.Request, accountDB *accounts.Database, userID string,
) util.JSONResponse {
localpart, _, err := gomatrixserverlib.SplitID('@', userID)
profile, err := getProfileByUserID(req, accountDB, userID)
if err != nil {
return httputil.LogThenError(req, err)
return *err
}
profile, err := accountDB.GetProfileByLocalpart(req.Context(), localpart)
if err != nil {
return httputil.LogThenError(req, err)
}
res := common.AvatarURL{
AvatarURL: profile.AvatarURL,
}
@ -133,7 +150,7 @@ func SetAvatarURL(
AvatarURL: r.AvatarURL,
}
events, err := buildMembershipEvents(req.Context(), memberships, newProfile, userID, cfg, queryAPI)
events, err := buildMembershipEvents(req, memberships, newProfile, userID, cfg, queryAPI)
if err != nil {
return httputil.LogThenError(req, err)
}
@ -156,15 +173,11 @@ func SetAvatarURL(
func GetDisplayName(
req *http.Request, accountDB *accounts.Database, userID string,
) util.JSONResponse {
localpart, _, err := gomatrixserverlib.SplitID('@', userID)
profile, err := getProfileByUserID(req, accountDB, userID)
if err != nil {
return httputil.LogThenError(req, err)
return *err
}
profile, err := accountDB.GetProfileByLocalpart(req.Context(), localpart)
if err != nil {
return httputil.LogThenError(req, err)
}
res := common.DisplayName{
DisplayName: profile.DisplayName,
}
@ -225,7 +238,7 @@ func SetDisplayName(
AvatarURL: oldProfile.AvatarURL,
}
events, err := buildMembershipEvents(req.Context(), memberships, newProfile, userID, cfg, queryAPI)
events, err := buildMembershipEvents(req, memberships, newProfile, userID, cfg, queryAPI)
if err != nil {
return httputil.LogThenError(req, err)
}
@ -245,7 +258,7 @@ func SetDisplayName(
}
func buildMembershipEvents(
ctx context.Context,
req *http.Request,
memberships []authtypes.Membership,
newProfile authtypes.Profile, userID string, cfg *config.Dendrite,
queryAPI api.RoomserverQueryAPI,
@ -271,7 +284,7 @@ func buildMembershipEvents(
return nil, err
}
event, err := common.BuildEvent(ctx, &builder, *cfg, queryAPI, nil)
event, err := common.BuildEvent(req, &builder, *cfg, queryAPI, nil)
if err != nil {
return nil, err
}

View file

@ -39,6 +39,8 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/userutil"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus"
@ -115,6 +117,9 @@ type registerRequest struct {
InitialDisplayName *string `json:"initial_device_display_name"`
// Prevent this user from logging in
InhibitLogin common.WeakBoolean `json:"inhibit_login"`
// Application Services place Type in the root of their registration
// request, whereas clients place it in the authDict struct.
Type authtypes.LoginType `json:"type"`
@ -162,9 +167,9 @@ func newUserInteractiveResponse(
// http://matrix.org/speculator/spec/HEAD/client_server/unstable.html#post-matrix-client-unstable-register
type registerResponse struct {
UserID string `json:"user_id"`
AccessToken string `json:"access_token"`
AccessToken string `json:"access_token,omitempty"`
HomeServer gomatrixserverlib.ServerName `json:"home_server"`
DeviceID string `json:"device_id"`
DeviceID string `json:"device_id,omitempty"`
}
// recaptchaResponse represents the HTTP response from a Google Recaptcha server
@ -175,8 +180,8 @@ type recaptchaResponse struct {
ErrorCodes []int `json:"error-codes"`
}
// validateUserName returns an error response if the username is invalid
func validateUserName(username string) *util.JSONResponse {
// validateUsername returns an error response if the username is invalid
func validateUsername(username string) *util.JSONResponse {
// https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/rest/client/v2_alpha/register.py#L161
if len(username) > maxUsernameLength {
return &util.JSONResponse{
@ -186,12 +191,28 @@ func validateUserName(username string) *util.JSONResponse {
} else if !validUsernameRegex.MatchString(username) {
return &util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.InvalidUsername("User ID can only contain characters a-z, 0-9, or '_-./'"),
JSON: jsonerror.InvalidUsername("Username can only contain characters a-z, 0-9, or '_-./'"),
}
} else if username[0] == '_' { // Regex checks its not a zero length string
return &util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.InvalidUsername("User ID can't start with a '_'"),
JSON: jsonerror.InvalidUsername("Username cannot start with a '_'"),
}
}
return nil
}
// validateApplicationServiceUsername returns an error response if the username is invalid for an application service
func validateApplicationServiceUsername(username string) *util.JSONResponse {
if len(username) > maxUsernameLength {
return &util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.BadJSON(fmt.Sprintf("'username' >%d characters", maxUsernameLength)),
}
} else if !validUsernameRegex.MatchString(username) {
return &util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.InvalidUsername("Username can only contain characters a-z, 0-9, or '_-./'"),
}
}
return nil
@ -280,31 +301,31 @@ func validateRecaptcha(
return nil
}
// UsernameIsWithinApplicationServiceNamespace checks to see if a username falls
// within any of the namespaces of a given Application Service. If no
// UserIDIsWithinApplicationServiceNamespace checks to see if a given userID
// falls within any of the namespaces of a given Application Service. If no
// Application Service is given, it will check to see if it matches any
// Application Service's namespace.
func UsernameIsWithinApplicationServiceNamespace(
func UserIDIsWithinApplicationServiceNamespace(
cfg *config.Dendrite,
username string,
userID string,
appservice *config.ApplicationService,
) bool {
if appservice != nil {
// Loop through given Application Service's namespaces and see if any match
// Loop through given application service's namespaces and see if any match
for _, namespace := range appservice.NamespaceMap["users"] {
// AS namespaces are checked for validity in config
if namespace.RegexpObject.MatchString(username) {
if namespace.RegexpObject.MatchString(userID) {
return true
}
}
return false
}
// Loop through all known Application Service's namespaces and see if any match
for _, knownAppservice := range cfg.Derived.ApplicationServices {
for _, namespace := range knownAppservice.NamespaceMap["users"] {
// Loop through all known application service's namespaces and see if any match
for _, knownAppService := range cfg.Derived.ApplicationServices {
for _, namespace := range knownAppService.NamespaceMap["users"] {
// AS namespaces are checked for validity in config
if namespace.RegexpObject.MatchString(username) {
if namespace.RegexpObject.MatchString(userID) {
return true
}
}
@ -318,19 +339,28 @@ func UsernameMatchesMultipleExclusiveNamespaces(
cfg *config.Dendrite,
username string,
) bool {
userID := userutil.MakeUserID(username, cfg.Matrix.ServerName)
// Check namespaces and see if more than one match
matchCount := 0
for _, appservice := range cfg.Derived.ApplicationServices {
for _, namespaceSlice := range appservice.NamespaceMap {
for _, namespace := range namespaceSlice {
// Check if we have a match on this username
if namespace.RegexpObject.MatchString(username) {
matchCount++
}
if appservice.IsInterestedInUserID(userID) {
if matchCount++; matchCount > 1 {
return true
}
}
}
return matchCount > 1
return false
}
// UsernameMatchesExclusiveNamespaces will check if a given username matches any
// application service's exclusive users namespace
func UsernameMatchesExclusiveNamespaces(
cfg *config.Dendrite,
username string,
) bool {
userID := userutil.MakeUserID(username, cfg.Matrix.ServerName)
return cfg.Derived.ExclusiveApplicationServicesUsernameRegexp.MatchString(userID)
}
// validateApplicationService checks if a provided application service token
@ -344,7 +374,13 @@ func validateApplicationService(
) (string, *util.JSONResponse) {
// Check if the token if the application service is valid with one we have
// registered in the config.
accessToken := req.URL.Query().Get("access_token")
accessToken, err := auth.ExtractAccessToken(req)
if err != nil {
return "", &util.JSONResponse{
Code: http.StatusUnauthorized,
JSON: jsonerror.MissingToken(err.Error()),
}
}
var matchedApplicationService *config.ApplicationService
for _, appservice := range cfg.Derived.ApplicationServices {
if appservice.ASToken == accessToken {
@ -359,8 +395,10 @@ func validateApplicationService(
}
}
userID := userutil.MakeUserID(username, cfg.Matrix.ServerName)
// Ensure the desired username is within at least one of the application service's namespaces.
if !UsernameIsWithinApplicationServiceNamespace(cfg, username, matchedApplicationService) {
if !UserIDIsWithinApplicationServiceNamespace(cfg, userID, matchedApplicationService) {
// If we didn't find any matches, return M_EXCLUSIVE
return "", &util.JSONResponse{
Code: http.StatusBadRequest,
@ -370,7 +408,7 @@ func validateApplicationService(
}
// Check this user does not fit multiple application service namespaces
if UsernameMatchesMultipleExclusiveNamespaces(cfg, username) {
if UsernameMatchesMultipleExclusiveNamespaces(cfg, userID) {
return "", &util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.ASExclusive(fmt.Sprintf(
@ -378,6 +416,11 @@ func validateApplicationService(
}
}
// Check username application service is trying to register is valid
if err := validateApplicationServiceUsername(username); err != nil {
return "", err
}
// No errors, registration valid
return matchedApplicationService.ID, nil
}
@ -421,19 +464,10 @@ func Register(
r.Username = strconv.FormatInt(id, 10)
}
// If no auth type is specified by the client, send back the list of available flows
if r.Auth.Type == "" {
return util.JSONResponse{
Code: http.StatusUnauthorized,
JSON: newUserInteractiveResponse(sessionID,
cfg.Derived.Registration.Flows, cfg.Derived.Registration.Params),
}
}
// Squash username to all lowercase letters
r.Username = strings.ToLower(r.Username)
if resErr = validateUserName(r.Username); resErr != nil {
if resErr = validateUsername(r.Username); resErr != nil {
return *resErr
}
if resErr = validatePassword(r.Password); resErr != nil {
@ -442,9 +476,9 @@ func Register(
// Make sure normal user isn't registering under an exclusive application
// service namespace. Skip this check if no app services are registered.
if r.Auth.Type != "m.login.application_service" &&
if r.Auth.Type != authtypes.LoginTypeApplicationService &&
len(cfg.Derived.ApplicationServices) != 0 &&
cfg.Derived.ExclusiveApplicationServicesUsernameRegexp.MatchString(r.Username) {
UsernameMatchesExclusiveNamespaces(cfg, r.Username) {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.ASExclusive("This username is reserved by an application service."),
@ -508,11 +542,11 @@ func handleRegistrationFlow(
// Add SharedSecret to the list of completed registration stages
sessions.AddCompletedStage(sessionID, authtypes.LoginTypeSharedSecret)
case authtypes.LoginTypeApplicationService:
// Check Application Service register user request is valid.
case "", authtypes.LoginTypeApplicationService:
// not passing a Auth.Type is allowed for ApplicationServices. So assume that as well
// Check application service register user request is valid.
// The application service's ID is returned if so.
appserviceID, err := validateApplicationService(cfg, req, r.Username)
if err != nil {
return *err
}
@ -520,8 +554,10 @@ func handleRegistrationFlow(
// If no error, application service was successfully validated.
// Don't need to worry about appending to registration stages as
// application service registration is entirely separate.
return completeRegistration(req.Context(), accountDB, deviceDB,
r.Username, "", appserviceID, r.InitialDisplayName)
return completeRegistration(
req.Context(), accountDB, deviceDB, r.Username, "", appserviceID,
r.InhibitLogin, r.InitialDisplayName,
)
case authtypes.LoginTypeDummy:
// there is nothing to do
@ -556,8 +592,10 @@ func checkAndCompleteFlow(
) util.JSONResponse {
if checkFlowCompleted(flow, cfg.Derived.Registration.Flows) {
// This flow was completed, registration can continue
return completeRegistration(req.Context(), accountDB, deviceDB,
r.Username, r.Password, "", r.InitialDisplayName)
return completeRegistration(
req.Context(), accountDB, deviceDB, r.Username, r.Password, "",
r.InhibitLogin, r.InitialDisplayName,
)
}
// There are still more stages to complete.
@ -607,10 +645,10 @@ func LegacyRegister(
return util.MessageResponse(http.StatusForbidden, "HMAC incorrect")
}
return completeRegistration(req.Context(), accountDB, deviceDB, r.Username, r.Password, "", nil)
return completeRegistration(req.Context(), accountDB, deviceDB, r.Username, r.Password, "", false, nil)
case authtypes.LoginTypeDummy:
// there is nothing to do
return completeRegistration(req.Context(), accountDB, deviceDB, r.Username, r.Password, "", nil)
return completeRegistration(req.Context(), accountDB, deviceDB, r.Username, r.Password, "", false, nil)
default:
return util.JSONResponse{
Code: http.StatusNotImplemented,
@ -630,7 +668,7 @@ func parseAndValidateLegacyLogin(req *http.Request, r *legacyRegisterRequest) *u
// Squash username to all lowercase letters
r.Username = strings.ToLower(r.Username)
if resErr = validateUserName(r.Username); resErr != nil {
if resErr = validateUsername(r.Username); resErr != nil {
return resErr
}
if resErr = validatePassword(r.Password); resErr != nil {
@ -653,6 +691,7 @@ func completeRegistration(
accountDB *accounts.Database,
deviceDB *devices.Database,
username, password, appserviceID string,
inhibitLogin common.WeakBoolean,
displayName *string,
) util.JSONResponse {
if username == "" {
@ -682,6 +721,18 @@ func completeRegistration(
}
}
// Check whether inhibit_login option is set. If so, don't create an access
// token or a device for this user
if inhibitLogin {
return util.JSONResponse{
Code: http.StatusOK,
JSON: registerResponse{
UserID: userutil.MakeUserID(username, acc.ServerName),
HomeServer: acc.ServerName,
},
}
}
token, err := auth.GenerateAccessToken()
if err != nil {
return util.JSONResponse{
@ -690,7 +741,7 @@ func completeRegistration(
}
}
// // TODO: Use the device ID in the request.
// TODO: Use the device ID in the request.
dev, err := deviceDB.CreateDevice(ctx, username, nil, token, displayName)
if err != nil {
return util.JSONResponse{
@ -822,7 +873,7 @@ func RegisterAvailable(
// Squash username to all lowercase letters
username = strings.ToLower(username)
if err := validateUserName(username); err != nil {
if err := validateUsername(username); err != nil {
return *err
}

View file

@ -15,9 +15,13 @@
package routing
import (
"net/http"
"net/url"
"regexp"
"testing"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/common/config"
)
var (
@ -145,3 +149,84 @@ func TestEmptyCompletedFlows(t *testing.T) {
t.Error("Empty Completed Flow Stages should be a empty slice: returned ", ret, ". Should be []")
}
}
// This method tests validation of the provided Application Service token and
// username that they're registering
func TestValidationOfApplicationServices(t *testing.T) {
// Set up application service namespaces
regex := "@_appservice_.*"
regexp, err := regexp.Compile(regex)
if err != nil {
t.Errorf("Error compiling regex: %s", regex)
}
fakeNamespace := config.ApplicationServiceNamespace{
Exclusive: true,
Regex: regex,
RegexpObject: regexp,
}
// Create a fake application service
fakeID := "FakeAS"
fakeSenderLocalpart := "_appservice_bot"
fakeApplicationService := config.ApplicationService{
ID: fakeID,
URL: "null",
ASToken: "1234",
HSToken: "4321",
SenderLocalpart: fakeSenderLocalpart,
NamespaceMap: map[string][]config.ApplicationServiceNamespace{
"users": {fakeNamespace},
},
}
// Set up a config
fakeConfig := config.Dendrite{}
fakeConfig.Matrix.ServerName = "localhost"
fakeConfig.Derived.ApplicationServices = []config.ApplicationService{fakeApplicationService}
// Access token is correct, user_id omitted so we are acting as SenderLocalpart
URL, _ := url.Parse("http://localhost/register?access_token=1234")
fakeHTTPRequest := http.Request{
Method: "POST",
URL: URL,
}
asID, resp := validateApplicationService(&fakeConfig, &fakeHTTPRequest, fakeSenderLocalpart)
if resp != nil || asID != fakeID {
t.Errorf("appservice should have validated and returned correct ID: %s", resp.JSON)
}
// Access token is incorrect, user_id omitted so we are acting as SenderLocalpart
URL, _ = url.Parse("http://localhost/register?access_token=xxxx")
fakeHTTPRequest = http.Request{
Method: "POST",
URL: URL,
}
asID, resp = validateApplicationService(&fakeConfig, &fakeHTTPRequest, fakeSenderLocalpart)
if resp == nil || asID == fakeID {
t.Errorf("access_token should have been marked as invalid")
}
// Access token is correct, acting as valid user_id
URL, _ = url.Parse("http://localhost/register?access_token=1234&user_id=@_appservice_bob:localhost")
fakeHTTPRequest = http.Request{
Method: "POST",
URL: URL,
}
asID, resp = validateApplicationService(&fakeConfig, &fakeHTTPRequest, "_appservice_bob")
if resp != nil || asID != fakeID {
t.Errorf("access_token and user_id should've been valid: %s", resp.JSON)
}
// Access token is correct, acting as invalid user_id
URL, _ = url.Parse("http://localhost/register?access_token=1234&user_id=@_something_else:localhost")
fakeHTTPRequest = http.Request{
Method: "POST",
URL: URL,
}
asID, resp = validateApplicationService(&fakeConfig, &fakeHTTPRequest, "_something_else")
if resp == nil || asID == fakeID {
t.Errorf("user_id should not have been valid: %s",
fakeHTTPRequest.URL.Query().Get("user_id"))
}
}

View file

@ -20,6 +20,7 @@ import (
"strings"
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/clientapi/auth"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
@ -49,6 +50,7 @@ func Setup(
keyRing gomatrixserverlib.KeyRing,
userUpdateProducer *producers.UserUpdateProducer,
syncProducer *producers.SyncAPIProducer,
typingProducer *producers.TypingServerProducer,
transactionsCache *transactions.Cache,
) {
@ -72,13 +74,15 @@ func Setup(
v1mux := apiMux.PathPrefix(pathPrefixV1).Subrouter()
unstableMux := apiMux.PathPrefix(pathPrefixUnstable).Subrouter()
authData := auth.Data{accountDB, deviceDB, cfg.Derived.ApplicationServices}
r0mux.Handle("/createRoom",
common.MakeAuthAPI("createRoom", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
common.MakeAuthAPI("createRoom", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
return CreateRoom(req, device, cfg, producer, accountDB, aliasAPI)
}),
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/join/{roomIDOrAlias}",
common.MakeAuthAPI("join", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
common.MakeAuthAPI("join", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
return JoinRoomByIDOrAlias(
req, device, vars["roomIDOrAlias"], cfg, federation, producer, queryAPI, aliasAPI, keyRing, accountDB,
@ -86,19 +90,19 @@ func Setup(
}),
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/{membership:(?:join|kick|ban|unban|leave|invite)}",
common.MakeAuthAPI("membership", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
common.MakeAuthAPI("membership", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
return SendMembership(req, accountDB, device, vars["roomID"], vars["membership"], cfg, queryAPI, producer)
}),
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/send/{eventType}",
common.MakeAuthAPI("send_message", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
common.MakeAuthAPI("send_message", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, nil, cfg, queryAPI, producer, nil)
}),
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/send/{eventType}/{txnID}",
common.MakeAuthAPI("send_message", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
common.MakeAuthAPI("send_message", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
txnID := vars["txnID"]
return SendEvent(req, device, vars["roomID"], vars["eventType"], &txnID,
@ -106,7 +110,7 @@ func Setup(
}),
).Methods(http.MethodPut, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/state/{eventType:[^/]+/?}",
common.MakeAuthAPI("send_message", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
common.MakeAuthAPI("send_message", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
emptyString := ""
eventType := vars["eventType"]
@ -118,7 +122,7 @@ func Setup(
}),
).Methods(http.MethodPut, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/state/{eventType}/{stateKey}",
common.MakeAuthAPI("send_message", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
common.MakeAuthAPI("send_message", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
stateKey := vars["stateKey"]
return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, &stateKey, cfg, queryAPI, producer, nil)
@ -138,38 +142,45 @@ func Setup(
})).Methods(http.MethodGet, http.MethodOptions)
r0mux.Handle("/directory/room/{roomAlias}",
common.MakeAuthAPI("directory_room", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
common.MakeAuthAPI("directory_room", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
return DirectoryRoom(req, vars["roomAlias"], federation, &cfg, aliasAPI)
}),
).Methods(http.MethodGet, http.MethodOptions)
r0mux.Handle("/directory/room/{roomAlias}",
common.MakeAuthAPI("directory_room", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
common.MakeAuthAPI("directory_room", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
return SetLocalAlias(req, device, vars["roomAlias"], &cfg, aliasAPI)
}),
).Methods(http.MethodPut, http.MethodOptions)
r0mux.Handle("/directory/room/{roomAlias}",
common.MakeAuthAPI("directory_room", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
common.MakeAuthAPI("directory_room", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
return RemoveLocalAlias(req, device, vars["roomAlias"], aliasAPI)
}),
).Methods(http.MethodDelete, http.MethodOptions)
r0mux.Handle("/logout",
common.MakeAuthAPI("logout", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
common.MakeAuthAPI("logout", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
return Logout(req, deviceDB, device)
}),
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/logout/all",
common.MakeAuthAPI("logout", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
common.MakeAuthAPI("logout", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
return LogoutAll(req, deviceDB, device)
}),
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/typing/{userID}",
common.MakeAuthAPI("rooms_typing", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
return SendTyping(req, device, vars["roomID"], vars["userID"], accountDB, typingProducer)
}),
).Methods(http.MethodPut, http.MethodOptions)
// Stub endpoints required by Riot
r0mux.Handle("/login",
@ -198,14 +209,14 @@ func Setup(
).Methods(http.MethodGet, http.MethodOptions)
r0mux.Handle("/user/{userId}/filter",
common.MakeAuthAPI("put_filter", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
common.MakeAuthAPI("put_filter", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
return PutFilter(req, device, accountDB, vars["userId"])
}),
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/user/{userId}/filter/{filterId}",
common.MakeAuthAPI("get_filter", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
common.MakeAuthAPI("get_filter", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
return GetFilter(req, device, accountDB, vars["userId"], vars["filterId"])
}),
@ -228,7 +239,7 @@ func Setup(
).Methods(http.MethodGet, http.MethodOptions)
r0mux.Handle("/profile/{userID}/avatar_url",
common.MakeAuthAPI("profile_avatar_url", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
common.MakeAuthAPI("profile_avatar_url", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
return SetAvatarURL(req, accountDB, device, vars["userID"], userUpdateProducer, &cfg, producer, queryAPI)
}),
@ -244,7 +255,7 @@ func Setup(
).Methods(http.MethodGet, http.MethodOptions)
r0mux.Handle("/profile/{userID}/displayname",
common.MakeAuthAPI("profile_displayname", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
common.MakeAuthAPI("profile_displayname", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
return SetDisplayName(req, accountDB, device, vars["userID"], userUpdateProducer, &cfg, producer, queryAPI)
}),
@ -253,19 +264,19 @@ func Setup(
// PUT requests, so we need to allow this method
r0mux.Handle("/account/3pid",
common.MakeAuthAPI("account_3pid", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
common.MakeAuthAPI("account_3pid", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
return GetAssociated3PIDs(req, accountDB, device)
}),
).Methods(http.MethodGet, http.MethodOptions)
r0mux.Handle("/account/3pid",
common.MakeAuthAPI("account_3pid", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
common.MakeAuthAPI("account_3pid", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
return CheckAndSave3PIDAssociation(req, accountDB, device, cfg)
}),
).Methods(http.MethodPost, http.MethodOptions)
unstableMux.Handle("/account/3pid/delete",
common.MakeAuthAPI("account_3pid", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
common.MakeAuthAPI("account_3pid", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
return Forget3PID(req, accountDB)
}),
).Methods(http.MethodPost, http.MethodOptions)
@ -288,7 +299,7 @@ func Setup(
).Methods(http.MethodPut, http.MethodOptions)
r0mux.Handle("/voip/turnServer",
common.MakeAuthAPI("turn_server", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
common.MakeAuthAPI("turn_server", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
return RequestTurnServer(req, device, cfg)
}),
).Methods(http.MethodGet, http.MethodOptions)
@ -314,28 +325,28 @@ func Setup(
).Methods(http.MethodGet, http.MethodOptions)
r0mux.Handle("/user/{userID}/account_data/{type}",
common.MakeAuthAPI("user_account_data", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
common.MakeAuthAPI("user_account_data", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
return SaveAccountData(req, accountDB, device, vars["userID"], "", vars["type"], syncProducer)
}),
).Methods(http.MethodPut, http.MethodOptions)
r0mux.Handle("/user/{userID}/rooms/{roomID}/account_data/{type}",
common.MakeAuthAPI("user_account_data", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
common.MakeAuthAPI("user_account_data", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
return SaveAccountData(req, accountDB, device, vars["userID"], vars["roomID"], vars["type"], syncProducer)
}),
).Methods(http.MethodPut, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/members",
common.MakeAuthAPI("rooms_members", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
common.MakeAuthAPI("rooms_members", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
return GetMemberships(req, device, vars["roomID"], false, cfg, queryAPI)
}),
).Methods(http.MethodGet, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/joined_members",
common.MakeAuthAPI("rooms_members", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
common.MakeAuthAPI("rooms_members", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
return GetMemberships(req, device, vars["roomID"], true, cfg, queryAPI)
}),
@ -348,28 +359,21 @@ func Setup(
}),
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/typing/{userID}",
common.MakeExternalAPI("rooms_typing", func(req *http.Request) util.JSONResponse {
// TODO: handling typing
return util.JSONResponse{Code: http.StatusOK, JSON: struct{}{}}
}),
).Methods(http.MethodPut, http.MethodOptions)
r0mux.Handle("/devices",
common.MakeAuthAPI("get_devices", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
common.MakeAuthAPI("get_devices", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
return GetDevicesByLocalpart(req, deviceDB, device)
}),
).Methods(http.MethodGet, http.MethodOptions)
r0mux.Handle("/devices/{deviceID}",
common.MakeAuthAPI("get_device", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
common.MakeAuthAPI("get_device", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
return GetDeviceByID(req, deviceDB, device, vars["deviceID"])
}),
).Methods(http.MethodGet, http.MethodOptions)
r0mux.Handle("/devices/{deviceID}",
common.MakeAuthAPI("device_data", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
common.MakeAuthAPI("device_data", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
return UpdateDeviceByID(req, deviceDB, device, vars["deviceID"])
}),

View file

@ -76,7 +76,7 @@ func SendEvent(
}
var queryRes api.QueryLatestEventsAndStateResponse
e, err := common.BuildEvent(req.Context(), &builder, cfg, queryAPI, &queryRes)
e, err := common.BuildEvent(req, &builder, cfg, queryAPI, &queryRes)
if err == common.ErrRoomNoExists {
return util.JSONResponse{
Code: http.StatusNotFound,

View file

@ -0,0 +1,80 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package routing
import (
"database/sql"
"net/http"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/clientapi/userutil"
"github.com/matrix-org/util"
)
type typingContentJSON struct {
Typing bool `json:"typing"`
Timeout int64 `json:"timeout"`
}
// SendTyping handles PUT /rooms/{roomID}/typing/{userID}
// sends the typing events to client API typingProducer
func SendTyping(
req *http.Request, device *authtypes.Device, roomID string,
userID string, accountDB *accounts.Database,
typingProducer *producers.TypingServerProducer,
) util.JSONResponse {
if device.UserID != userID {
return util.JSONResponse{
Code: http.StatusForbidden,
JSON: jsonerror.Forbidden("Cannot set another user's typing state"),
}
}
localpart, err := userutil.ParseUsernameParam(userID, nil)
if err != nil {
return httputil.LogThenError(req, err)
}
// Verify that the user is a member of this room
_, err = accountDB.GetMembershipInRoomByLocalpart(req.Context(), localpart, roomID)
if err == sql.ErrNoRows {
return util.JSONResponse{
Code: http.StatusForbidden,
JSON: jsonerror.Forbidden("User not in this room"),
}
} else if err != nil {
return httputil.LogThenError(req, err)
}
// parse the incoming http request
var r typingContentJSON
resErr := httputil.UnmarshalJSONRequest(req, &r)
if resErr != nil {
return *resErr
}
if err = typingProducer.Send(
req.Context(), userID, roomID, r.Typing, r.Timeout,
); err != nil {
return httputil.LogThenError(req, err)
}
return util.JSONResponse{
Code: http.StatusOK,
JSON: struct{}{},
}
}

View file

@ -85,7 +85,7 @@ var (
// fills the Matrix ID in the request body so a normal invite membership event
// can be emitted.
func CheckAndProcessInvite(
ctx context.Context,
req *http.Request,
device *authtypes.Device, body *MembershipRequest, cfg config.Dendrite,
queryAPI api.RoomserverQueryAPI, db *accounts.Database,
producer *producers.RoomserverProducer, membership string, roomID string,
@ -101,7 +101,7 @@ func CheckAndProcessInvite(
return
}
lookupRes, storeInviteRes, err := queryIDServer(ctx, db, cfg, device, body, roomID)
lookupRes, storeInviteRes, err := queryIDServer(req.Context(), db, cfg, device, body, roomID)
if err != nil {
return
}
@ -110,7 +110,7 @@ func CheckAndProcessInvite(
// No Matrix ID could be found for this 3PID, meaning that a
// "m.room.third_party_invite" have to be emitted from the data in
// storeInviteRes.
err = emit3PIDInviteEvent(ctx, body, storeInviteRes, device, roomID, cfg, queryAPI, producer)
err = emit3PIDInviteEvent(req, body, storeInviteRes, device, roomID, cfg, queryAPI, producer)
inviteStoredOnIDServer = err == nil
return
@ -325,7 +325,7 @@ func checkIDServerSignatures(
// emit3PIDInviteEvent builds and sends a "m.room.third_party_invite" event.
// Returns an error if something failed in the process.
func emit3PIDInviteEvent(
ctx context.Context,
req *http.Request,
body *MembershipRequest, res *idServerStoreInviteResponse,
device *authtypes.Device, roomID string, cfg config.Dendrite,
queryAPI api.RoomserverQueryAPI, producer *producers.RoomserverProducer,
@ -350,11 +350,11 @@ func emit3PIDInviteEvent(
}
var queryRes *api.QueryLatestEventsAndStateResponse
event, err := common.BuildEvent(ctx, builder, cfg, queryAPI, queryRes)
event, err := common.BuildEvent(req, builder, cfg, queryAPI, queryRes)
if err != nil {
return err
}
_, err = producer.SendEvents(ctx, []gomatrixserverlib.Event{*event}, cfg.Matrix.ServerName, nil)
_, err = producer.SendEvents(req.Context(), []gomatrixserverlib.Event{*event}, cfg.Matrix.ServerName, nil)
return err
}

View file

@ -22,16 +22,17 @@ import (
func main() {
cfg := basecomponent.ParseFlags()
base := basecomponent.NewBaseDendrite(cfg, "AppService")
base := basecomponent.NewBaseDendrite(cfg, "AppServiceAPI")
defer base.Close() // nolint: errcheck
accountDB := base.CreateAccountsDB()
deviceDB := base.CreateDeviceDB()
federation := base.CreateFederationClient()
alias, _, query := base.CreateHTTPRoomserverAPIs()
cache := transactions.New()
appservice.SetupAppServiceAPIComponent(
base, accountDB, federation, alias, query, cache,
base, accountDB, deviceDB, federation, alias, query, cache,
)
base.SetupAndServeHTTP(string(base.Cfg.Listen.FederationSender))

View file

@ -34,11 +34,12 @@ func main() {
keyRing := keydb.CreateKeyRing(federation.Client, keyDB)
alias, input, query := base.CreateHTTPRoomserverAPIs()
typingInputAPI := base.CreateHTTPTypingServerAPIs()
cache := transactions.New()
clientapi.SetupClientAPIComponent(
base, deviceDB, accountDB, federation, &keyRing,
alias, input, query, cache,
alias, input, query, typingInputAPI, cache,
)
base.SetupAndServeHTTP(string(base.Cfg.Listen.ClientAPI))

View file

@ -20,6 +20,7 @@ import (
"github.com/matrix-org/dendrite/common/keydb"
"github.com/matrix-org/dendrite/common/transactions"
"github.com/matrix-org/dendrite/typingserver"
"github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/clientapi"
@ -56,12 +57,13 @@ func main() {
keyRing := keydb.CreateKeyRing(federation.Client, keyDB)
alias, input, query := roomserver.SetupRoomServerComponent(base)
typingInputAPI := typingserver.SetupTypingServerComponent(base)
encryptoapi.SetupEcryptoapi(base, deviceDB)
clientapi.SetupClientAPIComponent(
base, deviceDB, accountDB,
federation, &keyRing, alias, input, query,
federation, &keyRing, alias, input, query, typingInputAPI,
transactions.New(),
)
federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query)
@ -69,7 +71,7 @@ func main() {
mediaapi.SetupMediaAPIComponent(base, deviceDB)
publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB)
syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query)
appservice.SetupAppServiceAPIComponent(base, accountDB, federation, alias, query, transactions.New())
appservice.SetupAppServiceAPIComponent(base, accountDB, deviceDB, federation, alias, query, transactions.New())
httpHandler := common.WrapHandlerInCORS(base.APIMux)

View file

@ -30,8 +30,10 @@ import (
"github.com/gorilla/mux"
sarama "gopkg.in/Shopify/sarama.v1"
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/roomserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
typingServerAPI "github.com/matrix-org/dendrite/typingserver/api"
"github.com/sirupsen/logrus"
)
@ -80,15 +82,31 @@ func (b *BaseDendrite) Close() error {
return b.tracerCloser.Close()
}
// CreateHTTPRoomserverAPIs returns the AliasAPI, InputAPI and QueryAPI to hit
// CreateHTTPAppServiceAPIs returns the QueryAPI for hitting the appservice
// component over HTTP.
func (b *BaseDendrite) CreateHTTPAppServiceAPIs() appserviceAPI.AppServiceQueryAPI {
return appserviceAPI.NewAppServiceQueryAPIHTTP(b.Cfg.AppServiceURL(), nil)
}
// CreateHTTPRoomserverAPIs returns the AliasAPI, InputAPI and QueryAPI for hitting
// the roomserver over HTTP.
func (b *BaseDendrite) CreateHTTPRoomserverAPIs() (api.RoomserverAliasAPI, api.RoomserverInputAPI, api.RoomserverQueryAPI) {
alias := api.NewRoomserverAliasAPIHTTP(b.Cfg.RoomServerURL(), nil)
input := api.NewRoomserverInputAPIHTTP(b.Cfg.RoomServerURL(), nil)
query := api.NewRoomserverQueryAPIHTTP(b.Cfg.RoomServerURL(), nil)
func (b *BaseDendrite) CreateHTTPRoomserverAPIs() (
roomserverAPI.RoomserverAliasAPI,
roomserverAPI.RoomserverInputAPI,
roomserverAPI.RoomserverQueryAPI,
) {
alias := roomserverAPI.NewRoomserverAliasAPIHTTP(b.Cfg.RoomServerURL(), nil)
input := roomserverAPI.NewRoomserverInputAPIHTTP(b.Cfg.RoomServerURL(), nil)
query := roomserverAPI.NewRoomserverQueryAPIHTTP(b.Cfg.RoomServerURL(), nil)
return alias, input, query
}
// CreateHTTPTypingServerAPIs returns typingInputAPI for hitting the typing
// server over HTTP
func (b *BaseDendrite) CreateHTTPTypingServerAPIs() typingServerAPI.TypingServerInputAPI {
return typingServerAPI.NewTypingServerInputAPIHTTP(b.Cfg.TypingServerURL(), nil)
}
// CreateDeviceDB creates a new instance of the device database. Should only be
// called once per component.
func (b *BaseDendrite) CreateDeviceDB() *devices.Database {

View file

@ -66,9 +66,57 @@ type ApplicationService struct {
Protocols []string `yaml:"protocols"`
}
// loadAppservices iterates through all application service config files
// IsInterestedInRoomID returns a bool on whether an application service's
// namespace includes the given room ID
func (a *ApplicationService) IsInterestedInRoomID(
roomID string,
) bool {
if namespaceSlice, ok := a.NamespaceMap["rooms"]; ok {
for _, namespace := range namespaceSlice {
if namespace.RegexpObject.MatchString(roomID) {
return true
}
}
}
return false
}
// IsInterestedInUserID returns a bool on whether an application service's
// namespace includes the given user ID
func (a *ApplicationService) IsInterestedInUserID(
userID string,
) bool {
if namespaceSlice, ok := a.NamespaceMap["users"]; ok {
for _, namespace := range namespaceSlice {
if namespace.RegexpObject.MatchString(userID) {
return true
}
}
}
return false
}
// IsInterestedInRoomAlias returns a bool on whether an application service's
// namespace includes the given room alias
func (a *ApplicationService) IsInterestedInRoomAlias(
roomAlias string,
) bool {
if namespaceSlice, ok := a.NamespaceMap["aliases"]; ok {
for _, namespace := range namespaceSlice {
if namespace.RegexpObject.MatchString(roomAlias) {
return true
}
}
}
return false
}
// loadAppServices iterates through all application service config files
// and loads their data into the config object for later access.
func loadAppservices(config *Dendrite) error {
func loadAppServices(config *Dendrite) error {
for _, configPath := range config.ApplicationServices.ConfigFiles {
// Create a new application service with default options
appservice := ApplicationService{
@ -123,8 +171,6 @@ func setupRegexps(cfg *Dendrite) (err error) {
}
}
fmt.Println(exclusiveUsernameStrings, exclusiveAliasStrings)
// Join the regexes together into one big regex.
// i.e. "app1.*", "app2.*" -> "(app1.*)|(app2.*)"
// Later we can check if a username or alias matches any exclusive regex and
@ -190,17 +236,20 @@ func checkErrors(config *Dendrite) (err error) {
}
}
// Check if the url has trailing /'s. If so, remove them
appservice.URL = strings.TrimRight(appservice.URL, "/")
// Check if we've already seen this ID. No two application services
// can have the same ID or token.
if idMap[appservice.ID] {
return configErrors([]string{fmt.Sprintf(
"Application Service ID %s must be unique", appservice.ID,
"Application service ID %s must be unique", appservice.ID,
)})
}
// Check if we've already seen this token
if tokenMap[appservice.ASToken] {
return configErrors([]string{fmt.Sprintf(
"Application Service Token %s must be unique", appservice.ASToken,
"Application service Token %s must be unique", appservice.ASToken,
)})
}
@ -209,18 +258,6 @@ func checkErrors(config *Dendrite) (err error) {
idMap[appservice.ID] = true
tokenMap[appservice.ASToken] = true
// Check if more than one regex exists per namespace
for _, namespace := range appservice.NamespaceMap {
if len(namespace) > 1 {
// It's quite easy to accidentally make multiple regex objects per
// namespace, which often ends up in an application service receiving events
// it doesn't want, as an empty regex will match all events.
return configErrors([]string{fmt.Sprintf(
"Application Service namespace can only contain a single regex tuple. Check your YAML.",
)})
}
}
// TODO: Remove once rate_limited is implemented
if appservice.RateLimited {
log.Warn("WARNING: Application service option rate_limited is currently unimplemented")

View file

@ -162,6 +162,9 @@ type Dendrite struct {
// The FederationSender database stores information used by the FederationSender
// It is only accessed by the FederationSender.
FederationSender DataSource `yaml:"federation_sender"`
// The AppServices database stores information used by the AppService component.
// It is only accessed by the AppService component.
AppService DataSource `yaml:"appservice"`
// The PublicRoomsAPI database stores information used to compute the public
// room directory. It is only accessed by the PublicRoomsAPI server.
PublicRoomsAPI DataSource `yaml:"public_rooms_api"`
@ -197,10 +200,12 @@ type Dendrite struct {
MediaAPI Address `yaml:"media_api"`
ClientAPI Address `yaml:"client_api"`
FederationAPI Address `yaml:"federation_api"`
AppServiceAPI Address `yaml:"appservice_api"`
SyncAPI Address `yaml:"sync_api"`
RoomServer Address `yaml:"room_server"`
FederationSender Address `yaml:"federation_sender"`
PublicRoomsAPI Address `yaml:"public_rooms_api"`
TypingServer Address `yaml:"typing_server"`
} `yaml:"listen"`
// The config for tracing the dendrite servers.
@ -233,15 +238,15 @@ type Dendrite struct {
Params map[string]interface{} `json:"params"`
}
// Application Services parsed from their config files
// Application services parsed from their config files
// The paths of which were given above in the main config file
ApplicationServices []ApplicationService
// Meta-regexes compiled from all exclusive Application Service
// Meta-regexes compiled from all exclusive application service
// Regexes.
//
// When a user registers, we check that their username does not match any
// exclusive Application Service namespaces
// exclusive application service namespaces
ExclusiveApplicationServicesUsernameRegexp *regexp.Regexp
// When a user creates a room alias, we check that it isn't already
// reserved by an application service
@ -407,7 +412,7 @@ func (config *Dendrite) derive() error {
}
// Load application service configuration files
if err := loadAppservices(config); err != nil {
if err := loadAppServices(config); err != nil {
return err
}
@ -544,6 +549,7 @@ func (config *Dendrite) checkListen(configErrs *configErrors) {
checkNotEmpty(configErrs, "listen.federation_api", string(config.Listen.FederationAPI))
checkNotEmpty(configErrs, "listen.sync_api", string(config.Listen.SyncAPI))
checkNotEmpty(configErrs, "listen.room_server", string(config.Listen.RoomServer))
checkNotEmpty(configErrs, "listen.typing_server", string(config.Listen.TypingServer))
}
// checkLogging verifies the parameters logging.* are valid.
@ -639,6 +645,15 @@ func fingerprintPEM(data []byte) *gomatrixserverlib.TLSFingerprint {
}
}
// AppServiceURL returns a HTTP URL for where the appservice component is listening.
func (config *Dendrite) AppServiceURL() string {
// Hard code the roomserver to talk HTTP for now.
// If we support HTTPS we need to think of a practical way to do certificate validation.
// People setting up servers shouldn't need to get a certificate valid for the public
// internet for an internal API.
return "http://" + string(config.Listen.AppServiceAPI)
}
// RoomServerURL returns an HTTP URL for where the roomserver is listening.
func (config *Dendrite) RoomServerURL() string {
// Hard code the roomserver to talk HTTP for now.
@ -648,6 +663,15 @@ func (config *Dendrite) RoomServerURL() string {
return "http://" + string(config.Listen.RoomServer)
}
// TypingServerURL returns an HTTP URL for where the typing server is listening.
func (config *Dendrite) TypingServerURL() string {
// Hard code the typing server to talk HTTP for now.
// If we support HTTPS we need to think of a practical way to do certificate validation.
// People setting up servers shouldn't need to get a certificate valid for the public
// internet for an internal API.
return "http://" + string(config.Listen.TypingServer)
}
// SetupTracing configures the opentracing using the supplied configuration.
func (config *Dendrite) SetupTracing(serviceName string) (closer io.Closer, err error) {
return config.Tracing.Jaeger.InitGlobalTracer(

View file

@ -59,6 +59,7 @@ listen:
federation_api: "localhost:7772"
sync_api: "localhost:7773"
media_api: "localhost:7774"
typing_server: "localhost:7778"
logging:
- type: "file"
level: "info"

View file

@ -18,6 +18,8 @@ import (
"context"
"errors"
"fmt"
"net/http"
"strconv"
"time"
"github.com/matrix-org/dendrite/common/config"
@ -38,18 +40,18 @@ var ErrRoomNoExists = errors.New("Room does not exist")
// the room doesn't exist
// Returns an error if something else went wrong
func BuildEvent(
ctx context.Context,
req *http.Request,
builder *gomatrixserverlib.EventBuilder, cfg config.Dendrite,
queryAPI api.RoomserverQueryAPI, queryRes *api.QueryLatestEventsAndStateResponse,
) (*gomatrixserverlib.Event, error) {
err := AddPrevEventsToEvent(ctx, builder, queryAPI, queryRes)
err := AddPrevEventsToEvent(req.Context(), builder, queryAPI, queryRes)
if err != nil {
return nil, err
}
eventID := fmt.Sprintf("$%s:%s", util.RandomString(16), cfg.Matrix.ServerName)
now := time.Now()
event, err := builder.Build(eventID, now, cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey)
eventTime := ParseTSParam(req)
event, err := builder.Build(eventID, eventTime, cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey)
if err != nil {
return nil, err
}
@ -57,6 +59,25 @@ func BuildEvent(
return &event, nil
}
// ParseTSParam takes a req from an application service and parses a Time object
// from the req if it exists in the query parameters. If it doesn't exist, the
// current time is returned.
func ParseTSParam(req *http.Request) time.Time {
// Use the ts parameter's value for event time if present
tsStr := req.URL.Query().Get("ts")
if tsStr == "" {
return time.Now()
}
// The parameter exists, parse into a Time object
ts, err := strconv.ParseInt(tsStr, 10, 64)
if err != nil {
return time.Unix(ts/1000, 0)
}
return time.Unix(ts/1000, 0)
}
// AddPrevEventsToEvent fills out the prev_events and auth_events fields in builder
func AddPrevEventsToEvent(
ctx context.Context,

View file

@ -0,0 +1,57 @@
package http
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
)
// PostJSON performs a POST request with JSON on an internal HTTP API
func PostJSON(
ctx context.Context, span opentracing.Span, httpClient *http.Client,
apiURL string, request, response interface{},
) error {
jsonBytes, err := json.Marshal(request)
if err != nil {
return err
}
req, err := http.NewRequest(http.MethodPost, apiURL, bytes.NewReader(jsonBytes))
if err != nil {
return err
}
// Mark the span as being an RPC client.
ext.SpanKindRPCClient.Set(span)
carrier := opentracing.HTTPHeadersCarrier(req.Header)
tracer := opentracing.GlobalTracer()
if err = tracer.Inject(span.Context(), opentracing.HTTPHeaders, carrier); err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
res, err := httpClient.Do(req.WithContext(ctx))
if res != nil {
defer (func() { err = res.Body.Close() })()
}
if err != nil {
return err
}
if res.StatusCode != http.StatusOK {
var errorBody struct {
Message string `json:"message"`
}
if err = json.NewDecoder(res.Body).Decode(&errorBody); err != nil {
return err
}
return fmt.Errorf("api: %d: %s", res.StatusCode, errorBody.Message)
}
return json.NewDecoder(res.Body).Decode(response)
}

View file

@ -13,13 +13,17 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
)
// MakeAuthAPI turns a util.JSONRequestHandler function into an http.Handler which checks the access token in the request.
func MakeAuthAPI(metricsName string, deviceDB auth.DeviceDatabase, f func(*http.Request, *authtypes.Device) util.JSONResponse) http.Handler {
// MakeAuthAPI turns a util.JSONRequestHandler function into an http.Handler which authenticates the request.
func MakeAuthAPI(
metricsName string, data auth.Data,
f func(*http.Request, *authtypes.Device) util.JSONResponse,
) http.Handler {
h := func(req *http.Request) util.JSONResponse {
device, resErr := auth.VerifyAccessToken(req, deviceDB)
if resErr != nil {
return *resErr
device, err := auth.VerifyUserFromRequest(req, data)
if err != nil {
return *err
}
return f(req, device)
}
return MakeExternalAPI(metricsName, h)

View file

@ -103,6 +103,7 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con
cfg.Listen.RoomServer = assignAddress()
cfg.Listen.SyncAPI = assignAddress()
cfg.Listen.PublicRoomsAPI = assignAddress()
cfg.Listen.TypingServer = assignAddress()
return &cfg, port, nil
}

View file

@ -0,0 +1,34 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package test
import "sort"
// UnsortedStringSliceEqual returns true if the slices have same length & elements.
// Does not modify the given slice.
func UnsortedStringSliceEqual(first, second []string) bool {
if len(first) != len(second) {
return false
}
a, b := first[:], second[:]
sort.Strings(a)
sort.Strings(b)
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}

View file

@ -14,6 +14,10 @@
package common
import (
"strconv"
)
// AccountData represents account data sent from the client API server to the
// sync API server
type AccountData struct {
@ -36,3 +40,22 @@ type AvatarURL struct {
type DisplayName struct {
DisplayName string `json:"displayname"`
}
// WeakBoolean is a type that will Unmarshal to true or false even if the encoded
// representation is "true"/1 or "false"/0, as well as whatever other forms are
// recognized by strconv.ParseBool
type WeakBoolean bool
// UnmarshalJSON is overridden here to allow strings vaguely representing a true
// or false boolean to be set as their closest counterpart
func (b *WeakBoolean) UnmarshalJSON(data []byte) error {
result, err := strconv.ParseBool(string(data))
if err != nil {
return err
}
// Set boolean value based on string input
*b = WeakBoolean(result)
return nil
}

View file

@ -17,9 +17,7 @@ package routing
import (
"context"
"net/http"
"time"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
@ -29,10 +27,7 @@ import (
func GetEvent(
ctx context.Context,
request *gomatrixserverlib.FederationRequest,
_ config.Dendrite,
query api.RoomserverQueryAPI,
_ time.Time,
_ gomatrixserverlib.KeyRing,
eventID string,
) util.JSONResponse {
event, err := getEvent(ctx, request, query, eventID)

View file

@ -31,7 +31,6 @@ import (
// MakeJoin implements the /make_join API
func MakeJoin(
ctx context.Context,
httpReq *http.Request,
request *gomatrixserverlib.FederationRequest,
cfg config.Dendrite,
@ -65,7 +64,7 @@ func MakeJoin(
}
var queryRes api.QueryLatestEventsAndStateResponse
event, err := common.BuildEvent(ctx, &builder, cfg, query, &queryRes)
event, err := common.BuildEvent(httpReq, &builder, cfg, query, &queryRes)
if err == common.ErrRoomNoExists {
return util.JSONResponse{
Code: http.StatusNotFound,

View file

@ -0,0 +1,174 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package routing
import (
"encoding/json"
"net/http"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
// MakeLeave implements the /make_leave API
func MakeLeave(
httpReq *http.Request,
request *gomatrixserverlib.FederationRequest,
cfg config.Dendrite,
query api.RoomserverQueryAPI,
roomID, userID string,
) util.JSONResponse {
_, domain, err := gomatrixserverlib.SplitID('@', userID)
if err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.BadJSON("Invalid UserID"),
}
}
if domain != request.Origin() {
return util.JSONResponse{
Code: http.StatusForbidden,
JSON: jsonerror.Forbidden("The leave must be sent by the server of the user"),
}
}
// Try building an event for the server
builder := gomatrixserverlib.EventBuilder{
Sender: userID,
RoomID: roomID,
Type: "m.room.member",
StateKey: &userID,
}
err = builder.SetContent(map[string]interface{}{"membership": "leave"})
if err != nil {
return httputil.LogThenError(httpReq, err)
}
var queryRes api.QueryLatestEventsAndStateResponse
event, err := common.BuildEvent(httpReq, &builder, cfg, query, &queryRes)
if err == common.ErrRoomNoExists {
return util.JSONResponse{
Code: http.StatusNotFound,
JSON: jsonerror.NotFound("Room does not exist"),
}
} else if err != nil {
return httputil.LogThenError(httpReq, err)
}
// Check that the leave is allowed or not
stateEvents := make([]*gomatrixserverlib.Event, len(queryRes.StateEvents))
for i := range queryRes.StateEvents {
stateEvents[i] = &queryRes.StateEvents[i]
}
provider := gomatrixserverlib.NewAuthEvents(stateEvents)
if err = gomatrixserverlib.Allowed(*event, &provider); err != nil {
return util.JSONResponse{
Code: http.StatusForbidden,
JSON: jsonerror.Forbidden(err.Error()),
}
}
return util.JSONResponse{
Code: http.StatusOK,
JSON: map[string]interface{}{"event": builder},
}
}
// SendLeave implements the /send_leave API
func SendLeave(
httpReq *http.Request,
request *gomatrixserverlib.FederationRequest,
cfg config.Dendrite,
producer *producers.RoomserverProducer,
keys gomatrixserverlib.KeyRing,
roomID, eventID string,
) util.JSONResponse {
var event gomatrixserverlib.Event
if err := json.Unmarshal(request.Content(), &event); err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.NotJSON("The request body could not be decoded into valid JSON. " + err.Error()),
}
}
// Check that the room ID is correct.
if event.RoomID() != roomID {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.BadJSON("The room ID in the request path must match the room ID in the leave event JSON"),
}
}
// Check that the event ID is correct.
if event.EventID() != eventID {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.BadJSON("The event ID in the request path must match the event ID in the leave event JSON"),
}
}
// Check that the event is from the server sending the request.
if event.Origin() != request.Origin() {
return util.JSONResponse{
Code: http.StatusForbidden,
JSON: jsonerror.Forbidden("The leave must be sent by the server it originated on"),
}
}
// Check that the event is signed by the server sending the request.
verifyRequests := []gomatrixserverlib.VerifyJSONRequest{{
ServerName: event.Origin(),
Message: event.Redact().JSON(),
AtTS: event.OriginServerTS(),
}}
verifyResults, err := keys.VerifyJSONs(httpReq.Context(), verifyRequests)
if err != nil {
return httputil.LogThenError(httpReq, err)
}
if verifyResults[0].Error != nil {
return util.JSONResponse{
Code: http.StatusForbidden,
JSON: jsonerror.Forbidden("The leave must be signed by the server it originated on"),
}
}
// check membership is set to leave
mem, err := event.Membership()
if err != nil {
return httputil.LogThenError(httpReq, err)
} else if mem != "leave" {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.BadJSON("The membership in the event content must be set to leave"),
}
}
// Send the events to the room server.
// We are responsible for notifying other servers that the user has left
// the room, so set SendAsServer to cfg.Matrix.ServerName
_, err = producer.SendEvents(httpReq.Context(), []gomatrixserverlib.Event{event}, cfg.Matrix.ServerName, nil)
if err != nil {
return httputil.LogThenError(httpReq, err)
}
return util.JSONResponse{
Code: http.StatusOK,
JSON: struct{}{},
}
}

View file

@ -15,6 +15,7 @@
package routing
import (
"database/sql"
"net/http"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
@ -52,7 +53,12 @@ func GetProfile(
}
profile, err := accountDB.GetProfileByLocalpart(httpReq.Context(), localpart)
if err != nil {
if err == sql.ErrNoRows {
return util.JSONResponse{
Code: http.StatusNotFound,
JSON: jsonerror.NotFound("no profile information for this user or this user does not exist"),
}
} else if err != nil {
return httputil.LogThenError(httpReq, err)
}

View file

@ -103,29 +103,29 @@ func Setup(
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse {
vars := mux.Vars(httpReq)
return GetEvent(
httpReq.Context(), request, cfg, query, time.Now(), keys, vars["eventID"],
httpReq.Context(), request, query, vars["eventID"],
)
},
)).Methods(http.MethodGet)
v1fedmux.Handle("/state/{roomID}/{eventID}", common.MakeFedAPI(
v1fedmux.Handle("/state/{roomID}", common.MakeFedAPI(
"federation_get_event_auth", cfg.Matrix.ServerName, keys,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse {
vars := mux.Vars(httpReq)
return GetState(
httpReq.Context(), request, cfg, query, time.Now(),
keys, vars["roomID"], vars["eventID"],
keys, vars["roomID"],
)
},
)).Methods(http.MethodGet)
v1fedmux.Handle("/state_ids/{roomID}/{eventID}", common.MakeFedAPI(
v1fedmux.Handle("/state_ids/{roomID}", common.MakeFedAPI(
"federation_get_event_auth", cfg.Matrix.ServerName, keys,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse {
vars := mux.Vars(httpReq)
return GetStateIDs(
httpReq.Context(), request, cfg, query, time.Now(),
keys, vars["roomID"], vars["eventID"],
keys, vars["roomID"],
)
},
)).Methods(http.MethodGet)
@ -148,8 +148,8 @@ func Setup(
},
)).Methods(http.MethodGet)
v1fedmux.Handle("/query/user_devices/{userID}", common.MakeFedAPI(
"federation_query_user_devices", cfg.Matrix.ServerName, keys,
v1fedmux.Handle("/user/devices/{userID}", common.MakeFedAPI(
"federation_user_devices", cfg.Matrix.ServerName, keys,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse {
vars := mux.Vars(httpReq)
return GetUserDevices(
@ -165,7 +165,7 @@ func Setup(
roomID := vars["roomID"]
userID := vars["userID"]
return MakeJoin(
httpReq.Context(), httpReq, request, cfg, query, roomID, userID,
httpReq, request, cfg, query, roomID, userID,
)
},
)).Methods(http.MethodGet)
@ -182,6 +182,30 @@ func Setup(
},
)).Methods(http.MethodPut)
v1fedmux.Handle("/make_leave/{roomID}/{userID}", common.MakeFedAPI(
"federation_make_leave", cfg.Matrix.ServerName, keys,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse {
vars := mux.Vars(httpReq)
roomID := vars["roomID"]
userID := vars["userID"]
return MakeLeave(
httpReq, request, cfg, query, roomID, userID,
)
},
)).Methods(http.MethodGet)
v1fedmux.Handle("/send_leave/{roomID}/{userID}", common.MakeFedAPI(
"federation_send_leave", cfg.Matrix.ServerName, keys,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse {
vars := mux.Vars(httpReq)
roomID := vars["roomID"]
userID := vars["userID"]
return SendLeave(
httpReq, request, cfg, producer, keys, roomID, userID,
)
},
)).Methods(http.MethodPut)
v1fedmux.Handle("/version", common.MakeExternalAPI(
"federation_version",
func(httpReq *http.Request) util.JSONResponse {

View file

@ -15,8 +15,10 @@ package routing
import (
"context"
"net/http"
"net/url"
"time"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
@ -32,8 +34,12 @@ func GetState(
_ time.Time,
_ gomatrixserverlib.KeyRing,
roomID string,
eventID string,
) util.JSONResponse {
eventID, err := parseEventIDParam(request)
if err != nil {
return *err
}
state, err := getState(ctx, request, query, roomID, eventID)
if err != nil {
return *err
@ -51,8 +57,12 @@ func GetStateIDs(
_ time.Time,
_ gomatrixserverlib.KeyRing,
roomID string,
eventID string,
) util.JSONResponse {
eventID, err := parseEventIDParam(request)
if err != nil {
return *err
}
state, err := getState(ctx, request, query, roomID, eventID)
if err != nil {
return *err
@ -68,6 +78,26 @@ func GetStateIDs(
}
}
func parseEventIDParam(
request *gomatrixserverlib.FederationRequest,
) (eventID string, resErr *util.JSONResponse) {
URL, err := url.Parse(request.RequestURI())
if err != nil {
*resErr = util.ErrorResponse(err)
return
}
eventID = URL.Query().Get("event_id")
if eventID == "" {
resErr = &util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.MissingArgument("event_id missing"),
}
}
return
}
func getState(
ctx context.Context,
request *gomatrixserverlib.FederationRequest,

View file

@ -20,7 +20,6 @@ import (
"errors"
"fmt"
"net/http"
"time"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/httputil"
@ -70,7 +69,7 @@ func CreateInvitesFrom3PIDInvites(
evs := []gomatrixserverlib.Event{}
for _, inv := range body.Invites {
event, err := createInviteFrom3PIDInvite(
req.Context(), queryAPI, cfg, inv, federation, accountDB,
req, queryAPI, cfg, inv, federation, accountDB,
)
if err != nil {
return httputil.LogThenError(req, err)
@ -135,7 +134,7 @@ func ExchangeThirdPartyInvite(
}
// Auth and build the event from what the remote server sent us
event, err := buildMembershipEvent(httpReq.Context(), &builder, queryAPI, cfg)
event, err := buildMembershipEvent(httpReq, &builder, queryAPI, cfg)
if err == errNotInRoom {
return util.JSONResponse{
Code: http.StatusNotFound,
@ -170,7 +169,7 @@ func ExchangeThirdPartyInvite(
// Returns an error if there was a problem building the event or fetching the
// necessary data to do so.
func createInviteFrom3PIDInvite(
ctx context.Context, queryAPI api.RoomserverQueryAPI, cfg config.Dendrite,
req *http.Request, queryAPI api.RoomserverQueryAPI, cfg config.Dendrite,
inv invite, federation *gomatrixserverlib.FederationClient,
accountDB *accounts.Database,
) (*gomatrixserverlib.Event, error) {
@ -191,7 +190,7 @@ func createInviteFrom3PIDInvite(
StateKey: &inv.MXID,
}
profile, err := accountDB.GetProfileByLocalpart(ctx, localpart)
profile, err := accountDB.GetProfileByLocalpart(req.Context(), localpart)
if err != nil {
return nil, err
}
@ -209,9 +208,9 @@ func createInviteFrom3PIDInvite(
return nil, err
}
event, err := buildMembershipEvent(ctx, builder, queryAPI, cfg)
event, err := buildMembershipEvent(req, builder, queryAPI, cfg)
if err == errNotInRoom {
return nil, sendToRemoteServer(ctx, inv, federation, cfg, *builder)
return nil, sendToRemoteServer(req.Context(), inv, federation, cfg, *builder)
}
if err != nil {
return nil, err
@ -226,7 +225,7 @@ func createInviteFrom3PIDInvite(
// Returns errNotInRoom if the server is not in the room the invite is for.
// Returns an error if something failed during the process.
func buildMembershipEvent(
ctx context.Context,
req *http.Request,
builder *gomatrixserverlib.EventBuilder, queryAPI api.RoomserverQueryAPI,
cfg config.Dendrite,
) (*gomatrixserverlib.Event, error) {
@ -241,7 +240,7 @@ func buildMembershipEvent(
StateToFetch: eventsNeeded.Tuples(),
}
var queryRes api.QueryLatestEventsAndStateResponse
if err = queryAPI.QueryLatestEventsAndState(ctx, &queryReq, &queryRes); err != nil {
if err = queryAPI.QueryLatestEventsAndState(req.Context(), &queryReq, &queryRes); err != nil {
return nil, err
}
@ -274,8 +273,8 @@ func buildMembershipEvent(
builder.AuthEvents = refs
eventID := fmt.Sprintf("$%s:%s", util.RandomString(16), cfg.Matrix.ServerName)
now := time.Now()
event, err := builder.Build(eventID, now, cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey)
eventTime := common.ParseTSParam(req)
event, err := builder.Build(eventID, eventTime, cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey)
return &event, err
}

View file

@ -17,6 +17,7 @@ package routing
import (
"net/http"
"github.com/matrix-org/dendrite/clientapi/auth"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/gorilla/mux"
@ -45,10 +46,11 @@ func Setup(
activeThumbnailGeneration := &types.ActiveThumbnailGeneration{
PathToResult: map[string]*types.ThumbnailGenerationResult{},
}
authData := auth.Data{nil, deviceDB, nil}
// TODO: Add AS support
r0mux.Handle("/upload", common.MakeAuthAPI(
"upload",
deviceDB,
"upload", authData,
func(req *http.Request, _ *authtypes.Device) util.JSONResponse {
return Upload(req, cfg, db, activeThumbnailGeneration)
},

View file

@ -18,6 +18,7 @@ import (
"net/http"
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/clientapi/auth"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
"github.com/matrix-org/dendrite/common"
@ -31,14 +32,18 @@ const pathPrefixR0 = "/_matrix/client/r0"
// Setup configures the given mux with publicroomsapi server listeners
func Setup(apiMux *mux.Router, deviceDB *devices.Database, publicRoomsDB *storage.PublicRoomsServerDatabase) {
r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter()
authData := auth.Data{nil, deviceDB, nil}
r0mux.Handle("/directory/list/room/{roomID}",
common.MakeExternalAPI("directory_list", func(req *http.Request) util.JSONResponse {
vars := mux.Vars(req)
return directory.GetVisibility(req, publicRoomsDB, vars["roomID"])
}),
).Methods(http.MethodGet, http.MethodOptions)
// TODO: Add AS support
r0mux.Handle("/directory/list/room/{roomID}",
common.MakeAuthAPI("directory_list", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
common.MakeAuthAPI("directory_list", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
return directory.SetVisibility(req, publicRoomsDB, vars["roomID"])
}),

View file

@ -18,6 +18,7 @@ import (
"context"
"net/http"
commonHTTP "github.com/matrix-org/dendrite/common/http"
opentracing "github.com/opentracing/opentracing-go"
)
@ -139,7 +140,7 @@ func (h *httpRoomserverAliasAPI) SetRoomAlias(
defer span.Finish()
apiURL := h.roomserverURL + RoomserverSetRoomAliasPath
return postJSON(ctx, span, h.httpClient, apiURL, request, response)
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// GetRoomIDForAlias implements RoomserverAliasAPI
@ -152,7 +153,7 @@ func (h *httpRoomserverAliasAPI) GetRoomIDForAlias(
defer span.Finish()
apiURL := h.roomserverURL + RoomserverGetRoomIDForAliasPath
return postJSON(ctx, span, h.httpClient, apiURL, request, response)
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// GetAliasesForRoomID implements RoomserverAliasAPI
@ -165,7 +166,7 @@ func (h *httpRoomserverAliasAPI) GetAliasesForRoomID(
defer span.Finish()
apiURL := h.roomserverURL + RoomserverGetAliasesForRoomIDPath
return postJSON(ctx, span, h.httpClient, apiURL, request, response)
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// RemoveRoomAlias implements RoomserverAliasAPI
@ -178,5 +179,5 @@ func (h *httpRoomserverAliasAPI) RemoveRoomAlias(
defer span.Finish()
apiURL := h.roomserverURL + RoomserverRemoveRoomAliasPath
return postJSON(ctx, span, h.httpClient, apiURL, request, response)
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}

View file

@ -19,6 +19,7 @@ import (
"context"
"net/http"
commonHTTP "github.com/matrix-org/dendrite/common/http"
"github.com/matrix-org/gomatrixserverlib"
opentracing "github.com/opentracing/opentracing-go"
)
@ -134,5 +135,5 @@ func (h *httpRoomserverInputAPI) InputRoomEvents(
defer span.Finish()
apiURL := h.roomserverURL + RoomserverInputRoomEventsPath
return postJSON(ctx, span, h.httpClient, apiURL, request, response)
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}

View file

@ -15,16 +15,12 @@
package api
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
commonHTTP "github.com/matrix-org/dendrite/common/http"
"github.com/matrix-org/gomatrixserverlib"
opentracing "github.com/opentracing/opentracing-go"
)
// QueryLatestEventsAndStateRequest is a request to QueryLatestEventsAndState
@ -104,6 +100,25 @@ type QueryEventsByIDResponse struct {
Events []gomatrixserverlib.Event `json:"events"`
}
// QueryMembershipForUserRequest is a request to QueryMembership
type QueryMembershipForUserRequest struct {
// ID of the room to fetch membership from
RoomID string `json:"room_id"`
// ID of the user for whom membership is requested
UserID string `json:"user_id"`
}
// QueryMembershipForUserResponse is a response to QueryMembership
type QueryMembershipForUserResponse struct {
// The EventID of the latest "m.room.member" event for the sender,
// if HasBeenInRoom is true.
EventID string `json:"event_id"`
// True if the user has been in room before and has either stayed in it or left it.
HasBeenInRoom bool `json:"has_been_in_room"`
// True if the user is in room.
IsInRoom bool `json:"is_in_room"`
}
// QueryMembershipsForRoomRequest is a request to QueryMembershipsForRoom
type QueryMembershipsForRoomRequest struct {
// If true, only returns the membership events of "join" membership
@ -222,6 +237,13 @@ type RoomserverQueryAPI interface {
response *QueryEventsByIDResponse,
) error
// Query the membership event for an user for a room.
QueryMembershipForUser(
ctx context.Context,
request *QueryMembershipForUserRequest,
response *QueryMembershipForUserResponse,
) error
// Query a list of membership events for a room
QueryMembershipsForRoom(
ctx context.Context,
@ -269,6 +291,9 @@ const RoomserverQueryStateAfterEventsPath = "/api/roomserver/queryStateAfterEven
// RoomserverQueryEventsByIDPath is the HTTP path for the QueryEventsByID API.
const RoomserverQueryEventsByIDPath = "/api/roomserver/queryEventsByID"
// RoomserverQueryMembershipForUserPath is the HTTP path for the QueryMembershipForUser API.
const RoomserverQueryMembershipForUserPath = "/api/roomserver/queryMembershipForUser"
// RoomserverQueryMembershipsForRoomPath is the HTTP path for the QueryMembershipsForRoom API
const RoomserverQueryMembershipsForRoomPath = "/api/roomserver/queryMembershipsForRoom"
@ -308,7 +333,7 @@ func (h *httpRoomserverQueryAPI) QueryLatestEventsAndState(
defer span.Finish()
apiURL := h.roomserverURL + RoomserverQueryLatestEventsAndStatePath
return postJSON(ctx, span, h.httpClient, apiURL, request, response)
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// QueryStateAfterEvents implements RoomserverQueryAPI
@ -321,7 +346,7 @@ func (h *httpRoomserverQueryAPI) QueryStateAfterEvents(
defer span.Finish()
apiURL := h.roomserverURL + RoomserverQueryStateAfterEventsPath
return postJSON(ctx, span, h.httpClient, apiURL, request, response)
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// QueryEventsByID implements RoomserverQueryAPI
@ -334,7 +359,20 @@ func (h *httpRoomserverQueryAPI) QueryEventsByID(
defer span.Finish()
apiURL := h.roomserverURL + RoomserverQueryEventsByIDPath
return postJSON(ctx, span, h.httpClient, apiURL, request, response)
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// QueryMembershipForUser implements RoomserverQueryAPI
func (h *httpRoomserverQueryAPI) QueryMembershipForUser(
ctx context.Context,
request *QueryMembershipForUserRequest,
response *QueryMembershipForUserResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryMembershipForUser")
defer span.Finish()
apiURL := h.roomserverURL + RoomserverQueryMembershipForUserPath
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// QueryMembershipsForRoom implements RoomserverQueryAPI
@ -347,7 +385,7 @@ func (h *httpRoomserverQueryAPI) QueryMembershipsForRoom(
defer span.Finish()
apiURL := h.roomserverURL + RoomserverQueryMembershipsForRoomPath
return postJSON(ctx, span, h.httpClient, apiURL, request, response)
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// QueryInvitesForUser implements RoomserverQueryAPI
@ -360,7 +398,7 @@ func (h *httpRoomserverQueryAPI) QueryInvitesForUser(
defer span.Finish()
apiURL := h.roomserverURL + RoomserverQueryInvitesForUserPath
return postJSON(ctx, span, h.httpClient, apiURL, request, response)
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// QueryServerAllowedToSeeEvent implements RoomserverQueryAPI
@ -373,7 +411,7 @@ func (h *httpRoomserverQueryAPI) QueryServerAllowedToSeeEvent(
defer span.Finish()
apiURL := h.roomserverURL + RoomserverQueryServerAllowedToSeeEventPath
return postJSON(ctx, span, h.httpClient, apiURL, request, response)
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// QueryMissingEvents implements RoomServerQueryAPI
@ -386,7 +424,7 @@ func (h *httpRoomserverQueryAPI) QueryMissingEvents(
defer span.Finish()
apiURL := h.roomserverURL + RoomserverQueryMissingEventsPath
return postJSON(ctx, span, h.httpClient, apiURL, request, response)
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// QueryStateAndAuthChain implements RoomserverQueryAPI
@ -399,49 +437,5 @@ func (h *httpRoomserverQueryAPI) QueryStateAndAuthChain(
defer span.Finish()
apiURL := h.roomserverURL + RoomserverQueryStateAndAuthChainPath
return postJSON(ctx, span, h.httpClient, apiURL, request, response)
}
func postJSON(
ctx context.Context, span opentracing.Span, httpClient *http.Client,
apiURL string, request, response interface{},
) error {
jsonBytes, err := json.Marshal(request)
if err != nil {
return err
}
req, err := http.NewRequest(http.MethodPost, apiURL, bytes.NewReader(jsonBytes))
if err != nil {
return err
}
// Mark the span as being an RPC client.
ext.SpanKindRPCClient.Set(span)
carrier := opentracing.HTTPHeadersCarrier(req.Header)
tracer := opentracing.GlobalTracer()
if err = tracer.Inject(span.Context(), opentracing.HTTPHeaders, carrier); err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
res, err := httpClient.Do(req.WithContext(ctx))
if res != nil {
defer (func() { err = res.Body.Close() })()
}
if err != nil {
return err
}
if res.StatusCode != http.StatusOK {
var errorBody struct {
Message string `json:"message"`
}
if err = json.NewDecoder(res.Body).Decode(&errorBody); err != nil {
return err
}
return fmt.Errorf("api: %d: %s", res.StatusCode, errorBody.Message)
}
return json.NewDecoder(res.Body).Decode(response)
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}

View file

@ -227,6 +227,37 @@ func (r *RoomserverQueryAPI) loadEvents(
return result, nil
}
// QueryMembershipForUser implements api.RoomserverQueryAPI
func (r *RoomserverQueryAPI) QueryMembershipForUser(
ctx context.Context,
request *api.QueryMembershipForUserRequest,
response *api.QueryMembershipForUserResponse,
) error {
roomNID, err := r.DB.RoomNID(ctx, request.RoomID)
if err != nil {
return err
}
membershipEventNID, stillInRoom, err := r.DB.GetMembership(ctx, roomNID, request.UserID)
if err != nil {
return err
}
if membershipEventNID == 0 {
response.HasBeenInRoom = false
return nil
}
response.IsInRoom = stillInRoom
eventIDMap, err := r.DB.EventIDs(ctx, []types.EventNID{membershipEventNID})
if err != nil {
return err
}
response.EventID = eventIDMap[membershipEventNID]
return nil
}
// QueryMembershipsForRoom implements api.RoomserverQueryAPI
func (r *RoomserverQueryAPI) QueryMembershipsForRoom(
ctx context.Context,
@ -240,7 +271,7 @@ func (r *RoomserverQueryAPI) QueryMembershipsForRoom(
membershipEventNID, stillInRoom, err := r.DB.GetMembership(ctx, roomNID, request.Sender)
if err != nil {
return nil
return err
}
if membershipEventNID == 0 {
@ -593,6 +624,20 @@ func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) {
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
servMux.Handle(
api.RoomserverQueryMembershipForUserPath,
common.MakeInternalAPI("QueryMembershipForUser", func(req *http.Request) util.JSONResponse {
var request api.QueryMembershipForUserRequest
var response api.QueryMembershipForUserResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err)
}
if err := r.QueryMembershipForUser(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
servMux.Handle(
api.RoomserverQueryMembershipsForRoomPath,
common.MakeInternalAPI("queryMembershipsForRoom", func(req *http.Request) util.JSONResponse {

View file

@ -19,8 +19,7 @@ import (
"encoding/json"
"testing"
"sort"
"github.com/matrix-org/dendrite/common/test"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
)
@ -90,24 +89,6 @@ func (db *getEventDB) EventsFromIDs(ctx context.Context, eventIDs []string) (res
return
}
// Returns if the slices are equal after sorting them.
func compareUnsortedStringSlices(a []string, b []string) bool {
if len(a) != len(b) {
return false
}
sort.Strings(a)
sort.Strings(b)
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}
func TestGetAuthChainSingle(t *testing.T) {
db := createEventDB()
@ -135,7 +116,7 @@ func TestGetAuthChainSingle(t *testing.T) {
expectedIDs := []string{"a", "b", "c", "d", "e"}
if !compareUnsortedStringSlices(expectedIDs, returnedIDs) {
if !test.UnsortedStringSliceEqual(expectedIDs, returnedIDs) {
t.Fatalf("returnedIDs got '%v', expected '%v'", returnedIDs, expectedIDs)
}
}
@ -168,7 +149,7 @@ func TestGetAuthChainMultiple(t *testing.T) {
expectedIDs := []string{"a", "b", "c", "d", "e", "f"}
if !compareUnsortedStringSlices(expectedIDs, returnedIDs) {
if !test.UnsortedStringSliceEqual(expectedIDs, returnedIDs) {
t.Fatalf("returnedIDs got '%v', expected '%v'", returnedIDs, expectedIDs)
}
}

View file

@ -18,6 +18,7 @@ import (
"net/http"
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/clientapi/auth"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
"github.com/matrix-org/dendrite/common"
@ -34,21 +35,24 @@ func Setup(apiMux *mux.Router, srp *sync.RequestPool, syncDB *storage.SyncServer
r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter()
unstablemux := apiMux.PathPrefix(pathPrefixUnstable).Subrouter()
r0mux.Handle("/sync", common.MakeAuthAPI("sync", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
authData := auth.Data{nil, deviceDB, nil}
// TODO: Add AS support for all handlers below.
r0mux.Handle("/sync", common.MakeAuthAPI("sync", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
return srp.OnIncomingSyncRequest(req, device)
})).Methods(http.MethodGet, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/state", common.MakeAuthAPI("room_state", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
r0mux.Handle("/rooms/{roomID}/state", common.MakeAuthAPI("room_state", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
return OnIncomingStateRequest(req, syncDB, vars["roomID"])
})).Methods(http.MethodGet, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/state/{type}", common.MakeAuthAPI("room_state", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
r0mux.Handle("/rooms/{roomID}/state/{type}", common.MakeAuthAPI("room_state", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
return OnIncomingStateTypeRequest(req, syncDB, vars["roomID"], vars["type"], "")
})).Methods(http.MethodGet, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/state/{type}/{stateKey}", common.MakeAuthAPI("room_state", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
r0mux.Handle("/rooms/{roomID}/state/{type}/{stateKey}", common.MakeAuthAPI("room_state", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
return OnIncomingStateTypeRequest(req, syncDB, vars["roomID"], vars["type"], vars["stateKey"])
})).Methods(http.MethodGet, http.MethodOptions)

View file

@ -0,0 +1,83 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package api provides the types that are used to communicate with the typing server.
package api
import (
"context"
"net/http"
commonHTTP "github.com/matrix-org/dendrite/common/http"
"github.com/matrix-org/gomatrixserverlib"
opentracing "github.com/opentracing/opentracing-go"
)
// InputTypingEvent is an event for notifying the typing server about typing updates.
type InputTypingEvent struct {
// UserID of the user to update typing status.
UserID string `json:"user_id"`
// RoomID of the room the user is typing (or has stopped).
RoomID string `json:"room_id"`
// Typing is true if the user is typing, false if they have stopped.
Typing bool `json:"typing"`
// Timeout is the interval for which the user should be marked as typing.
Timeout int64 `json:"timeout"`
// OriginServerTS when the server received the update.
OriginServerTS gomatrixserverlib.Timestamp `json:"origin_server_ts"`
}
// InputTypingEventRequest is a request to TypingServerInputAPI
type InputTypingEventRequest struct {
InputTypingEvent InputTypingEvent `json:"input_typing_event"`
}
// InputTypingEventResponse is a response to InputTypingEvents
type InputTypingEventResponse struct{}
// TypingServerInputAPI is used to write events to the typing server.
type TypingServerInputAPI interface {
InputTypingEvent(
ctx context.Context,
request *InputTypingEventRequest,
response *InputTypingEventResponse,
) error
}
// TypingServerInputTypingEventPath is the HTTP path for the InputTypingEvent API.
const TypingServerInputTypingEventPath = "/api/typingserver/input"
// NewTypingServerInputAPIHTTP creates a TypingServerInputAPI implemented by talking to a HTTP POST API.
func NewTypingServerInputAPIHTTP(typingServerURL string, httpClient *http.Client) TypingServerInputAPI {
if httpClient == nil {
httpClient = http.DefaultClient
}
return &httpTypingServerInputAPI{typingServerURL, httpClient}
}
type httpTypingServerInputAPI struct {
typingServerURL string
httpClient *http.Client
}
// InputRoomEvents implements TypingServerInputAPI
func (h *httpTypingServerInputAPI) InputTypingEvent(
ctx context.Context,
request *InputTypingEventRequest,
response *InputTypingEventResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "InputTypingEvent")
defer span.Finish()
apiURL := h.typingServerURL + TypingServerInputTypingEventPath
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}

View file

@ -0,0 +1,29 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package typingserver
import (
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/typingserver/api"
)
// SetupTypingServerComponent sets up and registers HTTP handlers for the
// TypingServer component. Returns instances of the various roomserver APIs,
// allowing other components running in the same process to hit the query the
// APIs directly instead of having to use HTTP.
func SetupTypingServerComponent(
base *basecomponent.BaseDendrite,
) api.TypingServerInputAPI {
// TODO: implement typing server
return base.CreateHTTPTypingServerAPIs()
}

2
vendor/manifest vendored
View file

@ -135,7 +135,7 @@
{
"importpath": "github.com/matrix-org/gomatrixserverlib",
"repository": "https://github.com/matrix-org/gomatrixserverlib",
"revision": "38a4f0f648bf357adc4bdb601cdc0535cee14e21",
"revision": "929828872b51e6733166553d6b1a20155b6ab829",
"branch": "master"
},
{

View file

@ -15,21 +15,8 @@
package gomatrixserverlib
// ApplicationServiceEvent is an event format that is sent off to an
// application service as part of a transaction.
type ApplicationServiceEvent struct {
Age int64 `json:"age,omitempty"`
Content RawJSON `json:"content,omitempty"`
EventID string `json:"event_id,omitempty"`
OriginServerTimestamp int64 `json:"origin_server_ts,omitempty"`
RoomID string `json:"room_id,omitempty"`
Sender string `json:"sender,omitempty"`
Type string `json:"type,omitempty"`
UserID string `json:"user_id,omitempty"`
}
// ApplicationServiceTransaction is the transaction that is sent off to an
// application service.
type ApplicationServiceTransaction struct {
Events []ApplicationServiceEvent `json:"events"`
Events []Event `json:"events"`
}

View file

@ -261,6 +261,9 @@ func newPowerLevelContentFromAuthEvents(authEvents AuthEventProvider, creatorUse
// If there is no power level event then the creator gets level 100
// https://github.com/matrix-org/synapse/blob/v0.18.5/synapse/api/auth.py#L569
c.userLevels = map[string]int64{creatorUserID: 100}
// If there is no power level event then the state_default is level 0
// https://github.com/matrix-org/synapse/blob/v0.18.5/synapse/api/auth.py#L997
c.stateDefaultLevel = 0
return
}

View file

@ -37,6 +37,12 @@ type RespState struct {
AuthEvents []Event `json:"auth_chain"`
}
// A RespEventAuth is the content of a response to GET /_matrix/federation/v1/event_auth/{roomID}/{eventID}
type RespEventAuth struct {
// A list of events needed to authenticate the state events.
AuthEvents []Event `json:"auth_chain"`
}
// Events combines the auth events and the state events and returns
// them in an order where every event comes after its auth events.
// Each event will only appear once in the output list.