App service filters roomserver events

Doing so based on namespace regexes that each app service has defined.

To get the aliases for a roomID a new aliasAPI endpoint was defined,
GetAliasesFromRoomID, which does exactly what it says on the tin.

Next step is to queue events to be sent off to each homeserver.
This commit is contained in:
Andrew Morgan 2018-05-24 18:20:29 +01:00
parent 3d8de687e3
commit ce210d167d
11 changed files with 195 additions and 66 deletions

View file

@ -36,10 +36,10 @@ func SetupAppServiceAPIComponent(
transactionsCache *transactions.Cache,
) {
consumer := consumers.NewOutputRoomEventConsumer(
base.Cfg, base.KafkaConsumer, accountsDB, queryAPI,
base.Cfg, base.KafkaConsumer, accountsDB, queryAPI, aliasAPI,
)
if err := consumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start app service's room server consumer")
logrus.WithError(err).Panicf("failed to start app service roomserver consumer")
}
// Set up HTTP Endpoints

View file

@ -29,11 +29,16 @@ import (
sarama "gopkg.in/Shopify/sarama.v1"
)
var (
appServices []config.ApplicationService
)
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
roomServerConsumer *common.ContinualConsumer
db *accounts.Database
query api.RoomserverQueryAPI
alias api.RoomserverAliasAPI
serverName string
}
@ -43,7 +48,9 @@ func NewOutputRoomEventConsumer(
kafkaConsumer sarama.Consumer,
store *accounts.Database,
queryAPI api.RoomserverQueryAPI,
aliasAPI api.RoomserverAliasAPI,
) *OutputRoomEventConsumer {
appServices = cfg.Derived.ApplicationServices
consumer := common.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
@ -54,6 +61,7 @@ func NewOutputRoomEventConsumer(
roomServerConsumer: &consumer,
db: store,
query: queryAPI,
alias: aliasAPI,
serverName: string(cfg.Matrix.ServerName),
}
consumer.ProcessMessage = s.onMessage
@ -86,7 +94,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
}
ev := output.NewRoomEvent.Event
fmt.Println("got event", ev)
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"room_id": ev.RoomID(),
@ -98,7 +105,12 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
return err
}
return s.db.UpdateMemberships(context.TODO(), events, output.NewRoomEvent.RemovesStateEventIDs)
if err = s.db.UpdateMemberships(context.TODO(), events, output.NewRoomEvent.RemovesStateEventIDs); err != nil {
return err
}
// Check if any events need to passed on to external application services
return s.filterRoomserverEvents(events)
}
// lookupStateEvents looks up the state events that are added by a new event.
@ -144,3 +156,55 @@ func (s *OutputRoomEventConsumer) lookupStateEvents(
return result, nil
}
// filterRoomserverEvents takes in events and decides whether any of them need
// to be passed on to an external application service. It does this by checking
// each namespace of each registered application service, and if there is a
// match, adds the event to the queue for events to be sent to a particular
// application service.
func (s *OutputRoomEventConsumer) filterRoomserverEvents(events []gomatrixserverlib.Event) error {
for _, event := range events {
for _, appservice := range appServices {
// Check if this event is interesting to this application service
if s.appserviceIsInterestedInEvent(event, appservice) {
// TODO: Queue this event to be sent off to the application service
fmt.Println(appservice.ID, "was interested in", event.Sender(), event.Type(), event.RoomID())
}
}
}
return nil
}
// appserviceIsInterestedInEvent returns a boolean depending on whether a given
// event falls within one of a given application service's namespaces.
func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(event gomatrixserverlib.Event, appservice config.ApplicationService) bool {
// Check sender of the event
for _, userNamespace := range appservice.NamespaceMap["users"] {
if userNamespace.RegexpObject.MatchString(event.Sender()) {
return true
}
}
// Check room id of the event
for _, roomNamespace := range appservice.NamespaceMap["rooms"] {
if roomNamespace.RegexpObject.MatchString(event.RoomID()) {
return true
}
}
// Check all known room aliases of the room the event came from
queryReq := api.GetAliasesForRoomIDRequest{RoomID: event.RoomID()}
var queryRes api.GetAliasesForRoomIDResponse
if err := s.alias.GetAliasesForRoomID(context.TODO(), &queryReq, &queryRes); err == nil {
for _, alias := range queryRes.Aliases {
for _, aliasNamespace := range appservice.NamespaceMap["aliases"] {
if aliasNamespace.RegexpObject.MatchString(alias) {
return true
}
}
}
}
return false
}

View file

@ -17,7 +17,6 @@ package consumers
import (
"context"
"encoding/json"
"fmt"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/common"
@ -86,7 +85,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
}
ev := output.NewRoomEvent.Event
fmt.Println("ClientAPI got an event:", ev)
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"room_id": ev.RoomID(),

View file

@ -46,9 +46,9 @@ func DirectoryRoom(
var resp gomatrixserverlib.RespDirectory
if domain == cfg.Matrix.ServerName {
queryReq := api.GetAliasRoomIDRequest{Alias: roomAlias}
var queryRes api.GetAliasRoomIDResponse
if err = aliasAPI.GetAliasRoomID(req.Context(), &queryReq, &queryRes); err != nil {
queryReq := api.GetRoomIDForAliasRequest{Alias: roomAlias}
var queryRes api.GetRoomIDForAliasResponse
if err = aliasAPI.GetRoomIDForAlias(req.Context(), &queryReq, &queryRes); err != nil {
return httputil.LogThenError(req, err)
}

View file

@ -145,9 +145,9 @@ func (r joinRoomReq) joinRoomByAlias(roomAlias string) util.JSONResponse {
}
}
if domain == r.cfg.Matrix.ServerName {
queryReq := api.GetAliasRoomIDRequest{Alias: roomAlias}
var queryRes api.GetAliasRoomIDResponse
if err = r.aliasAPI.GetAliasRoomID(r.req.Context(), &queryReq, &queryRes); err != nil {
queryReq := api.GetRoomIDForAliasRequest{Alias: roomAlias}
var queryRes api.GetRoomIDForAliasResponse
if err = r.aliasAPI.GetRoomIDForAlias(r.req.Context(), &queryReq, &queryRes); err != nil {
return httputil.LogThenError(r.req, err)
}

View file

@ -21,7 +21,7 @@ import (
"regexp"
"strings"
"gopkg.in/yaml.v2"
yaml "gopkg.in/yaml.v2"
)
// ApplicationServiceNamespace is the namespace that a specific application
@ -79,7 +79,8 @@ func loadAppservices(config *Dendrite) error {
// Append the parsed application service to the global config
config.Derived.ApplicationServices = append(
config.Derived.ApplicationServices, appservice)
config.Derived.ApplicationServices, appservice,
)
}
// Check for any errors in the loaded application services
@ -89,7 +90,7 @@ func loadAppservices(config *Dendrite) error {
// setupRegexps will create regex objects for exclusive and non-exclusive
// usernames, aliases and rooms of all application services, so that other
// methods can quickly check if a particular string matches any of them.
func setupRegexps(cfg *Dendrite) {
func setupRegexps(cfg *Dendrite) (err error) {
// Combine all exclusive namespaces for later string checking
var exclusiveUsernameStrings, exclusiveAliasStrings, exclusiveRoomStrings []string
@ -129,9 +130,17 @@ func setupRegexps(cfg *Dendrite) {
}
// Store compiled Regex
cfg.Derived.ExclusiveApplicationServicesUsernameRegexp, _ = regexp.Compile(exclusiveUsernames)
cfg.Derived.ExclusiveApplicationServicesUsernameRegexp, _ = regexp.Compile(exclusiveAliases)
cfg.Derived.ExclusiveApplicationServicesUsernameRegexp, _ = regexp.Compile(exclusiveRooms)
if cfg.Derived.ExclusiveApplicationServicesUsernameRegexp, err = regexp.Compile(exclusiveUsernames); err != nil {
return err
}
if cfg.Derived.ExclusiveApplicationServicesAliasRegexp, err = regexp.Compile(exclusiveAliases); err != nil {
return err
}
if cfg.Derived.ExclusiveApplicationServicesRoomRegexp, err = regexp.Compile(exclusiveRooms); err != nil {
return err
}
return nil
}
// appendExclusiveNamespaceRegexs takes a slice of strings and a slice of
@ -140,7 +149,7 @@ func setupRegexps(cfg *Dendrite) {
func appendExclusiveNamespaceRegexs(
exclusiveStrings *[]string, namespaces []ApplicationServiceNamespace,
) {
for _, namespace := range namespaces {
for index, namespace := range namespaces {
if namespace.Exclusive {
// We append parenthesis to later separate each regex when we compile
// i.e. "app1.*", "app2.*" -> "(app1.*)|(app2.*)"
@ -148,13 +157,13 @@ func appendExclusiveNamespaceRegexs(
}
// Compile this regex into a Regexp object for later use
namespace.RegexpObject, _ = regexp.Compile(namespace.Regex)
namespaces[index].RegexpObject, _ = regexp.Compile(namespace.Regex)
}
}
// checkErrors checks for any configuration errors amongst the loaded
// application services according to the application service spec.
func checkErrors(config *Dendrite) error {
func checkErrors(config *Dendrite) (err error) {
var idMap = make(map[string]bool)
var tokenMap = make(map[string]bool)
@ -190,9 +199,9 @@ func checkErrors(config *Dendrite) error {
}
}
}
setupRegexps(config)
return nil
err = setupRegexps(config)
return err
}
// IsValidRegex returns true or false based on whether the

View file

@ -30,7 +30,7 @@ import (
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ed25519"
"gopkg.in/yaml.v2"
yaml "gopkg.in/yaml.v2"
jaegerconfig "github.com/uber/jaeger-client-go/config"
jaegermetrics "github.com/uber/jaeger-lib/metrics"
@ -235,12 +235,19 @@ type Dendrite struct {
// The paths of which were given above in the main config file
ApplicationServices []ApplicationService
// A meta-regex compiled from all exclusive Application Service
// Regexes. When a user registers, we check that their username
// does not match any exclusive Application Service namespaces
// Meta-regexes compiled from all exclusive Application Service
// Regexes.
//
// When a user registers, we check that their username does not match any
// exclusive Application Service namespaces
ExclusiveApplicationServicesUsernameRegexp *regexp.Regexp
// TODO: Exclusive alias, room regexp's
// When a user creates a room alias, we check that it isn't already
// reserved by an application service
ExclusiveApplicationServicesAliasRegexp *regexp.Regexp
// TODO: Huh? When a room ID is created, we block the client from
// creating it if it falls under an application service's exclusive
// roomID regex? But why tho
ExclusiveApplicationServicesRoomRegexp *regexp.Regexp
} `yaml:"-"`
}

View file

@ -52,9 +52,9 @@ func RoomAliasToID(
var resp gomatrixserverlib.RespDirectory
if domain == cfg.Matrix.ServerName {
queryReq := api.GetAliasRoomIDRequest{Alias: roomAlias}
var queryRes api.GetAliasRoomIDResponse
if err = aliasAPI.GetAliasRoomID(httpReq.Context(), &queryReq, &queryRes); err != nil {
queryReq := api.GetRoomIDForAliasRequest{Alias: roomAlias}
var queryRes api.GetRoomIDForAliasResponse
if err = aliasAPI.GetRoomIDForAlias(httpReq.Context(), &queryReq, &queryRes); err != nil {
return httputil.LogThenError(httpReq, err)
}

View file

@ -35,10 +35,10 @@ type RoomserverAliasAPIDatabase interface {
SetRoomAlias(ctx context.Context, alias string, roomID string) error
// Look up the room ID a given alias refers to.
// Returns an error if there was a problem talking to the database.
GetRoomIDFromAlias(ctx context.Context, alias string) (string, error)
GetRoomIDForAlias(ctx context.Context, alias string) (string, error)
// Look up all aliases referring to a given room ID.
// Returns an error if there was a problem talking to the database.
GetAliasesFromRoomID(ctx context.Context, roomID string) ([]string, error)
GetAliasesForRoomID(ctx context.Context, roomID string) ([]string, error)
// Remove a given room alias.
// Returns an error if there was a problem talking to the database.
RemoveRoomAlias(ctx context.Context, alias string) error
@ -59,7 +59,7 @@ func (r *RoomserverAliasAPI) SetRoomAlias(
response *api.SetRoomAliasResponse,
) error {
// Check if the alias isn't already referring to a room
roomID, err := r.DB.GetRoomIDFromAlias(ctx, request.Alias)
roomID, err := r.DB.GetRoomIDForAlias(ctx, request.Alias)
if err != nil {
return err
}
@ -82,14 +82,14 @@ func (r *RoomserverAliasAPI) SetRoomAlias(
return r.sendUpdatedAliasesEvent(context.TODO(), request.UserID, request.RoomID)
}
// GetAliasRoomID implements api.RoomserverAliasAPI
func (r *RoomserverAliasAPI) GetAliasRoomID(
// GetRoomIDForAlias implements api.RoomserverAliasAPI
func (r *RoomserverAliasAPI) GetRoomIDForAlias(
ctx context.Context,
request *api.GetAliasRoomIDRequest,
response *api.GetAliasRoomIDResponse,
request *api.GetRoomIDForAliasRequest,
response *api.GetRoomIDForAliasResponse,
) error {
// Look up the room ID in the database
roomID, err := r.DB.GetRoomIDFromAlias(ctx, request.Alias)
roomID, err := r.DB.GetRoomIDForAlias(ctx, request.Alias)
if err != nil {
return err
}
@ -98,6 +98,22 @@ func (r *RoomserverAliasAPI) GetAliasRoomID(
return nil
}
// GetAliasesForRoomID implements api.RoomserverAliasAPI
func (r *RoomserverAliasAPI) GetAliasesForRoomID(
ctx context.Context,
request *api.GetAliasesForRoomIDRequest,
response *api.GetAliasesForRoomIDResponse,
) error {
// Look up the aliases in the database for the given RoomID
aliases, err := r.DB.GetAliasesForRoomID(ctx, request.RoomID)
if err != nil {
return err
}
response.Aliases = aliases
return nil
}
// RemoveRoomAlias implements api.RoomserverAliasAPI
func (r *RoomserverAliasAPI) RemoveRoomAlias(
ctx context.Context,
@ -105,7 +121,7 @@ func (r *RoomserverAliasAPI) RemoveRoomAlias(
response *api.RemoveRoomAliasResponse,
) error {
// Look up the room ID in the database
roomID, err := r.DB.GetRoomIDFromAlias(ctx, request.Alias)
roomID, err := r.DB.GetRoomIDForAlias(ctx, request.Alias)
if err != nil {
return err
}
@ -142,7 +158,7 @@ func (r *RoomserverAliasAPI) sendUpdatedAliasesEvent(
// Retrieve the updated list of aliases, marhal it and set it as the
// event's content
aliases, err := r.DB.GetAliasesFromRoomID(ctx, roomID)
aliases, err := r.DB.GetAliasesForRoomID(ctx, roomID)
if err != nil {
return err
}
@ -229,14 +245,14 @@ func (r *RoomserverAliasAPI) SetupHTTP(servMux *http.ServeMux) {
}),
)
servMux.Handle(
api.RoomserverGetAliasRoomIDPath,
common.MakeInternalAPI("getAliasRoomID", func(req *http.Request) util.JSONResponse {
var request api.GetAliasRoomIDRequest
var response api.GetAliasRoomIDResponse
api.RoomserverGetRoomIDForAliasPath,
common.MakeInternalAPI("GetRoomIDForAlias", func(req *http.Request) util.JSONResponse {
var request api.GetRoomIDForAliasRequest
var response api.GetRoomIDForAliasResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err)
}
if err := r.GetAliasRoomID(req.Context(), &request, &response); err != nil {
if err := r.GetRoomIDForAlias(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}

View file

@ -37,18 +37,30 @@ type SetRoomAliasResponse struct {
AliasExists bool `json:"alias_exists"`
}
// GetAliasRoomIDRequest is a request to GetAliasRoomID
type GetAliasRoomIDRequest struct {
// GetRoomIDForAliasRequest is a request to GetRoomIDForAlias
type GetRoomIDForAliasRequest struct {
// Alias we want to lookup
Alias string `json:"alias"`
}
// GetAliasRoomIDResponse is a response to GetAliasRoomID
type GetAliasRoomIDResponse struct {
// GetRoomIDForAliasResponse is a response to GetRoomIDForAlias
type GetRoomIDForAliasResponse struct {
// The room ID the alias refers to
RoomID string `json:"room_id"`
}
// GetAliasesForRoomIDRequest is a request to GetAliasesForRoomID
type GetAliasesForRoomIDRequest struct {
// The room ID we want to find aliases for
RoomID string `json:"room_id"`
}
// GetAliasesForRoomIDResponse is a response to GetAliasesForRoomID
type GetAliasesForRoomIDResponse struct {
// The aliases the alias refers to
Aliases []string `json:"aliases"`
}
// RemoveRoomAliasRequest is a request to RemoveRoomAlias
type RemoveRoomAliasRequest struct {
// ID of the user removing the alias
@ -70,10 +82,17 @@ type RoomserverAliasAPI interface {
) error
// Get the room ID for an alias
GetAliasRoomID(
GetRoomIDForAlias(
ctx context.Context,
req *GetAliasRoomIDRequest,
response *GetAliasRoomIDResponse,
req *GetRoomIDForAliasRequest,
response *GetRoomIDForAliasResponse,
) error
// Get all known aliases for a room ID
GetAliasesForRoomID(
ctx context.Context,
req *GetAliasesForRoomIDRequest,
response *GetAliasesForRoomIDResponse,
) error
// Remove a room alias
@ -87,8 +106,11 @@ type RoomserverAliasAPI interface {
// RoomserverSetRoomAliasPath is the HTTP path for the SetRoomAlias API.
const RoomserverSetRoomAliasPath = "/api/roomserver/setRoomAlias"
// RoomserverGetAliasRoomIDPath is the HTTP path for the GetAliasRoomID API.
const RoomserverGetAliasRoomIDPath = "/api/roomserver/getAliasRoomID"
// RoomserverGetRoomIDForAliasPath is the HTTP path for the GetRoomIDForAlias API.
const RoomserverGetRoomIDForAliasPath = "/api/roomserver/GetRoomIDForAlias"
// RoomserverGetAliasesForRoomIDPath is the HTTP path for the GetAliasesForRoomID API.
const RoomserverGetAliasesForRoomIDPath = "/api/roomserver/GetAliasesForRoomID"
// RoomserverRemoveRoomAliasPath is the HTTP path for the RemoveRoomAlias API.
const RoomserverRemoveRoomAliasPath = "/api/roomserver/removeRoomAlias"
@ -120,16 +142,29 @@ func (h *httpRoomserverAliasAPI) SetRoomAlias(
return postJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// GetAliasRoomID implements RoomserverAliasAPI
func (h *httpRoomserverAliasAPI) GetAliasRoomID(
// GetRoomIDForAlias implements RoomserverAliasAPI
func (h *httpRoomserverAliasAPI) GetRoomIDForAlias(
ctx context.Context,
request *GetAliasRoomIDRequest,
response *GetAliasRoomIDResponse,
request *GetRoomIDForAliasRequest,
response *GetRoomIDForAliasResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "GetAliasRoomID")
span, ctx := opentracing.StartSpanFromContext(ctx, "GetRoomIDForAlias")
defer span.Finish()
apiURL := h.roomserverURL + RoomserverGetAliasRoomIDPath
apiURL := h.roomserverURL + RoomserverGetRoomIDForAliasPath
return postJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// GetAliasesForRoomID implements RoomserverAliasAPI
func (h *httpRoomserverAliasAPI) GetAliasesForRoomID(
ctx context.Context,
request *GetAliasesForRoomIDRequest,
response *GetAliasesForRoomIDResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "GetAliasesForRoomID")
defer span.Finish()
apiURL := h.roomserverURL + RoomserverGetAliasesForRoomIDPath
return postJSON(ctx, span, h.httpClient, apiURL, request, response)
}

View file

@ -422,13 +422,13 @@ func (d *Database) SetRoomAlias(ctx context.Context, alias string, roomID string
return d.statements.insertRoomAlias(ctx, alias, roomID)
}
// GetRoomIDFromAlias implements alias.RoomserverAliasAPIDB
func (d *Database) GetRoomIDFromAlias(ctx context.Context, alias string) (string, error) {
// GetRoomIDForAlias implements alias.RoomserverAliasAPIDB
func (d *Database) GetRoomIDForAlias(ctx context.Context, alias string) (string, error) {
return d.statements.selectRoomIDFromAlias(ctx, alias)
}
// GetAliasesFromRoomID implements alias.RoomserverAliasAPIDB
func (d *Database) GetAliasesFromRoomID(ctx context.Context, roomID string) ([]string, error) {
// GetAliasesForRoomID implements alias.RoomserverAliasAPIDB
func (d *Database) GetAliasesForRoomID(ctx context.Context, roomID string) ([]string, error) {
return d.statements.selectAliasesFromRoomID(ctx, roomID)
}