mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-10 16:33:11 -06:00
Merge branch 'master' into rob/media-tests
This commit is contained in:
commit
6b66cb06fc
|
|
@ -18,11 +18,14 @@ import (
|
|||
"encoding/base64"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
"github.com/matrix-org/dendrite/federationapi/config"
|
||||
"github.com/matrix-org/dendrite/federationapi/routing"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
|
|
@ -40,7 +43,10 @@ var (
|
|||
// openssl x509 -noout -fingerprint -sha256 -inform pem -in server.crt |\
|
||||
// python -c 'print raw_input()[19:].replace(":","").decode("hex").encode("base64").rstrip("=\n")'
|
||||
//
|
||||
tlsFingerprint = os.Getenv("TLS_FINGERPRINT")
|
||||
tlsFingerprint = os.Getenv("TLS_FINGERPRINT")
|
||||
kafkaURIs = strings.Split(os.Getenv("KAFKA_URIS"), ",")
|
||||
roomserverURL = os.Getenv("ROOMSERVER_URL")
|
||||
roomserverInputTopic = os.Getenv("TOPIC_INPUT_ROOM_EVENT")
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
|
@ -57,6 +63,18 @@ func main() {
|
|||
log.Panic("No TLS_FINGERPRINT environment variable found.")
|
||||
}
|
||||
|
||||
if len(kafkaURIs) == 0 {
|
||||
// the kafka default is :9092
|
||||
kafkaURIs = []string{"localhost:9092"}
|
||||
}
|
||||
|
||||
if roomserverURL == "" {
|
||||
log.Panic("No ROOMSERVER_URL environment variable found.")
|
||||
}
|
||||
|
||||
if roomserverInputTopic == "" {
|
||||
log.Panic("No TOPIC_INPUT_ROOM_EVENT environment variable found. This should match the roomserver input topic.")
|
||||
}
|
||||
cfg := config.FederationAPI{
|
||||
ServerName: serverName,
|
||||
// TODO: make the validity period configurable.
|
||||
|
|
@ -75,6 +93,37 @@ func main() {
|
|||
}
|
||||
cfg.TLSFingerPrints = []gomatrixserverlib.TLSFingerprint{{fingerprintSHA256}}
|
||||
|
||||
routing.Setup(http.DefaultServeMux, cfg)
|
||||
federation := gomatrixserverlib.NewFederationClient(cfg.ServerName, cfg.KeyID, cfg.PrivateKey)
|
||||
|
||||
keyRing := gomatrixserverlib.KeyRing{
|
||||
KeyFetchers: []gomatrixserverlib.KeyFetcher{
|
||||
// TODO: Use perspective key fetchers for production.
|
||||
&gomatrixserverlib.DirectKeyFetcher{federation.Client},
|
||||
},
|
||||
KeyDatabase: &dummyKeyDatabase{},
|
||||
}
|
||||
queryAPI := api.NewRoomserverQueryAPIHTTP(roomserverURL, nil)
|
||||
|
||||
roomserverProducer, err := producers.NewRoomserverProducer(kafkaURIs, roomserverInputTopic)
|
||||
if err != nil {
|
||||
log.Panicf("Failed to setup kafka producers(%s): %s", kafkaURIs, err)
|
||||
}
|
||||
|
||||
routing.Setup(http.DefaultServeMux, cfg, queryAPI, roomserverProducer, keyRing, federation)
|
||||
log.Fatal(http.ListenAndServe(bindAddr, nil))
|
||||
}
|
||||
|
||||
// TODO: Implement a proper key database.
|
||||
type dummyKeyDatabase struct{}
|
||||
|
||||
func (d *dummyKeyDatabase) FetchKeys(
|
||||
requests map[gomatrixserverlib.PublicKeyRequest]gomatrixserverlib.Timestamp,
|
||||
) (map[gomatrixserverlib.PublicKeyRequest]gomatrixserverlib.ServerKeys, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (d *dummyKeyDatabase) StoreKeys(
|
||||
map[gomatrixserverlib.PublicKeyRequest]gomatrixserverlib.ServerKeys,
|
||||
) error {
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,124 @@
|
|||
// Copyright 2017 Vector Creations Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
"net/url"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
const usage = `Usage: %s
|
||||
|
||||
Create a single endpoint URL which remote matrix servers can be pointed at.
|
||||
|
||||
The server-server API in Dendrite is split across multiple processes
|
||||
which listen on multiple ports. You cannot point a Matrix server at
|
||||
any of those ports, as there will be unimplemented functionality.
|
||||
In addition, all server-server API processes start with the additional
|
||||
path prefix '/api', which Matrix servers will be unaware of.
|
||||
|
||||
This tool will proxy requests for all server-server URLs and forward
|
||||
them to their respective process. It will also add the '/api' path
|
||||
prefix to incoming requests.
|
||||
|
||||
THIS TOOL IS FOR TESTING AND NOT INTENDED FOR PRODUCTION USE.
|
||||
|
||||
Arguments:
|
||||
|
||||
`
|
||||
|
||||
var (
|
||||
federationAPIURL = flag.String("federation-api-url", "", "The base URL of the listening 'dendrite-federation-api-server' process. E.g. 'http://localhost:4200'")
|
||||
bindAddress = flag.String("bind-address", ":8448", "The listening port for the proxy.")
|
||||
certFile = flag.String("tls-cert", "server.crt", "The PEM formatted X509 certificate to use for TLS")
|
||||
keyFile = flag.String("tls-key", "server.key", "The PEM private key to use for TLS")
|
||||
)
|
||||
|
||||
func makeProxy(targetURL string) (*httputil.ReverseProxy, error) {
|
||||
if !strings.HasSuffix(targetURL, "/") {
|
||||
targetURL += "/"
|
||||
}
|
||||
// Check that we can parse the URL.
|
||||
_, err := url.Parse(targetURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &httputil.ReverseProxy{
|
||||
Director: func(req *http.Request) {
|
||||
// URL.Path() removes the % escaping from the path.
|
||||
// The % encoding will be added back when the url is encoded
|
||||
// when the request is forwarded.
|
||||
// This means that we will lose any unessecary escaping from the URL.
|
||||
// Pratically this means that any distinction between '%2F' and '/'
|
||||
// in the URL will be lost by the time it reaches the target.
|
||||
path := req.URL.Path
|
||||
path = "api" + path
|
||||
log.WithFields(log.Fields{
|
||||
"path": path,
|
||||
"url": targetURL,
|
||||
"method": req.Method,
|
||||
}).Print("proxying request")
|
||||
newURL, err := url.Parse(targetURL + path)
|
||||
if err != nil {
|
||||
// We already checked that we can parse the URL
|
||||
// So this shouldn't ever get hit.
|
||||
panic(err)
|
||||
}
|
||||
// Copy the query parameters from the request.
|
||||
newURL.RawQuery = req.URL.RawQuery
|
||||
req.URL = newURL
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func main() {
|
||||
flag.Usage = func() {
|
||||
fmt.Fprintf(os.Stderr, usage, os.Args[0])
|
||||
flag.PrintDefaults()
|
||||
}
|
||||
|
||||
flag.Parse()
|
||||
|
||||
if *federationAPIURL == "" {
|
||||
flag.Usage()
|
||||
fmt.Fprintln(os.Stderr, "no --federation-api-url specified.")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
federationProxy, err := makeProxy(*federationAPIURL)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
http.Handle("/", federationProxy)
|
||||
|
||||
srv := &http.Server{
|
||||
Addr: *bindAddress,
|
||||
ReadTimeout: 1 * time.Minute, // how long we wait for the client to send the entire request (after connection accept)
|
||||
WriteTimeout: 5 * time.Minute, // how long the proxy has to write the full response
|
||||
}
|
||||
|
||||
fmt.Println("Proxying requests to:")
|
||||
fmt.Println(" /* => ", *federationAPIURL+"/api/*")
|
||||
fmt.Println("Listening on ", *bindAddress)
|
||||
panic(srv.ListenAndServeTLS(*certFile, *keyFile))
|
||||
}
|
||||
|
|
@ -16,21 +16,35 @@ package routing
|
|||
|
||||
import (
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||
"github.com/matrix-org/dendrite/federationapi/config"
|
||||
"github.com/matrix-org/dendrite/federationapi/readers"
|
||||
"github.com/matrix-org/dendrite/federationapi/writers"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
pathPrefixV2Keys = "/_matrix/key/v2"
|
||||
pathPrefixV2Keys = "/_matrix/key/v2"
|
||||
pathPrefixV1Federation = "/_matrix/federation/v1"
|
||||
)
|
||||
|
||||
// Setup registers HTTP handlers with the given ServeMux.
|
||||
func Setup(servMux *http.ServeMux, cfg config.FederationAPI) {
|
||||
func Setup(
|
||||
servMux *http.ServeMux,
|
||||
cfg config.FederationAPI,
|
||||
query api.RoomserverQueryAPI,
|
||||
producer *producers.RoomserverProducer,
|
||||
keys gomatrixserverlib.KeyRing,
|
||||
federation *gomatrixserverlib.FederationClient,
|
||||
) {
|
||||
apiMux := mux.NewRouter()
|
||||
v2keysmux := apiMux.PathPrefix(pathPrefixV2Keys).Subrouter()
|
||||
v1fedmux := apiMux.PathPrefix(pathPrefixV1Federation).Subrouter()
|
||||
|
||||
localKeys := makeAPI("localkeys", func(req *http.Request) util.JSONResponse {
|
||||
return readers.LocalKeys(req, cfg)
|
||||
|
|
@ -43,6 +57,17 @@ func Setup(servMux *http.ServeMux, cfg config.FederationAPI) {
|
|||
v2keysmux.Handle("/server/{keyID}", localKeys)
|
||||
v2keysmux.Handle("/server/", localKeys)
|
||||
|
||||
v1fedmux.Handle("/send/{txnID}/", makeAPI("send",
|
||||
func(req *http.Request) util.JSONResponse {
|
||||
vars := mux.Vars(req)
|
||||
return writers.Send(
|
||||
req, gomatrixserverlib.TransactionID(vars["txnID"]),
|
||||
time.Now(),
|
||||
cfg, query, producer, keys, federation,
|
||||
)
|
||||
},
|
||||
))
|
||||
|
||||
servMux.Handle("/metrics", prometheus.Handler())
|
||||
servMux.Handle("/api/", http.StripPrefix("/api", apiMux))
|
||||
}
|
||||
|
|
|
|||
211
src/github.com/matrix-org/dendrite/federationapi/writers/send.go
Normal file
211
src/github.com/matrix-org/dendrite/federationapi/writers/send.go
Normal file
|
|
@ -0,0 +1,211 @@
|
|||
package writers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||
"github.com/matrix-org/dendrite/federationapi/config"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Send implements /_matrix/federation/v1/send/{txnID}
|
||||
func Send(
|
||||
req *http.Request,
|
||||
txnID gomatrixserverlib.TransactionID,
|
||||
now time.Time,
|
||||
cfg config.FederationAPI,
|
||||
query api.RoomserverQueryAPI,
|
||||
producer *producers.RoomserverProducer,
|
||||
keys gomatrixserverlib.KeyRing,
|
||||
federation *gomatrixserverlib.FederationClient,
|
||||
) util.JSONResponse {
|
||||
request, errResp := gomatrixserverlib.VerifyHTTPRequest(req, now, cfg.ServerName, keys)
|
||||
if request == nil {
|
||||
return errResp
|
||||
}
|
||||
|
||||
t := txnReq{
|
||||
query: query,
|
||||
producer: producer,
|
||||
keys: keys,
|
||||
federation: federation,
|
||||
}
|
||||
if err := json.Unmarshal(request.Content(), &t); err != nil {
|
||||
return util.JSONResponse{
|
||||
Code: 400,
|
||||
JSON: jsonerror.BadJSON("The request body could not be decoded into valid JSON. " + err.Error()),
|
||||
}
|
||||
}
|
||||
|
||||
t.Origin = request.Origin()
|
||||
t.TransactionID = txnID
|
||||
t.Destination = cfg.ServerName
|
||||
|
||||
resp, err := t.processTransaction()
|
||||
if err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
return util.JSONResponse{
|
||||
Code: 200,
|
||||
JSON: resp,
|
||||
}
|
||||
}
|
||||
|
||||
type txnReq struct {
|
||||
gomatrixserverlib.Transaction
|
||||
query api.RoomserverQueryAPI
|
||||
producer *producers.RoomserverProducer
|
||||
keys gomatrixserverlib.KeyRing
|
||||
federation *gomatrixserverlib.FederationClient
|
||||
}
|
||||
|
||||
func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) {
|
||||
// Check the event signatures
|
||||
if err := gomatrixserverlib.VerifyEventSignatures(t.PDUs, t.keys); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Process the events.
|
||||
results := map[string]gomatrixserverlib.PDUResult{}
|
||||
for _, e := range t.PDUs {
|
||||
err := t.processEvent(e)
|
||||
if err != nil {
|
||||
// If the error is due to the event itself being bad then we skip
|
||||
// it and move onto the next event. We report an error so that the
|
||||
// sender knows that we have skipped processing it.
|
||||
//
|
||||
// However if the event is due to a temporary failure in our server
|
||||
// such as a database being unavailable then we should bail, and
|
||||
// hope that the sender will retry when we are feeling better.
|
||||
//
|
||||
// It is uncertain what we should do if an event fails because
|
||||
// we failed to fetch more information from the sending server.
|
||||
// For example if a request to /state fails.
|
||||
// If we skip the event then we risk missing the event until we
|
||||
// receive another event referencing it.
|
||||
// If we bail and stop processing then we risk wedging incoming
|
||||
// transactions from that server forever.
|
||||
switch err.(type) {
|
||||
case unknownRoomError:
|
||||
case *gomatrixserverlib.NotAllowed:
|
||||
default:
|
||||
// Any other error should be the result of a temporary error in
|
||||
// our server so we should bail processing the transaction entirely.
|
||||
return nil, err
|
||||
}
|
||||
results[e.EventID()] = gomatrixserverlib.PDUResult{err.Error()}
|
||||
} else {
|
||||
results[e.EventID()] = gomatrixserverlib.PDUResult{}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Process the EDUs.
|
||||
|
||||
return &gomatrixserverlib.RespSend{PDUs: results}, nil
|
||||
}
|
||||
|
||||
type unknownRoomError struct {
|
||||
roomID string
|
||||
}
|
||||
|
||||
func (e unknownRoomError) Error() string { return fmt.Sprintf("unknown room %q", e.roomID) }
|
||||
|
||||
func (t *txnReq) processEvent(e gomatrixserverlib.Event) error {
|
||||
refs := e.PrevEvents()
|
||||
prevEventIDs := make([]string, len(refs))
|
||||
for i := range refs {
|
||||
prevEventIDs[i] = refs[i].EventID
|
||||
}
|
||||
|
||||
// Fetch the state needed to authenticate the event.
|
||||
needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{e})
|
||||
stateReq := api.QueryStateAfterEventsRequest{
|
||||
RoomID: e.RoomID(),
|
||||
PrevEventIDs: prevEventIDs,
|
||||
StateToFetch: needed.Tuples(),
|
||||
}
|
||||
var stateResp api.QueryStateAfterEventsResponse
|
||||
if err := t.query.QueryStateAfterEvents(&stateReq, &stateResp); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !stateResp.RoomExists {
|
||||
// TODO: When synapse receives a message for a room it is not in it
|
||||
// asks the remote server for the state of the room so that it can
|
||||
// check if the remote server knows of a join "m.room.member" event
|
||||
// that this server is unaware of.
|
||||
// However generally speaking we should reject events for rooms we
|
||||
// aren't a member of.
|
||||
return unknownRoomError{e.RoomID()}
|
||||
}
|
||||
|
||||
if !stateResp.PrevEventsExist {
|
||||
return t.processEventWithMissingState(e)
|
||||
}
|
||||
|
||||
// Check that the event is allowed by the state at the event.
|
||||
if err := checkAllowedByState(e, stateResp.StateEvents); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO: Check that the roomserver has a copy of all of the auth_events.
|
||||
// TODO: Check that the event is allowed by its auth_events.
|
||||
|
||||
// pass the event to the roomserver
|
||||
if err := t.producer.SendEvents([]gomatrixserverlib.Event{e}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func checkAllowedByState(e gomatrixserverlib.Event, stateEvents []gomatrixserverlib.Event) error {
|
||||
authUsingState := gomatrixserverlib.NewAuthEvents(nil)
|
||||
for i := range stateEvents {
|
||||
authUsingState.AddEvent(&stateEvents[i])
|
||||
}
|
||||
return gomatrixserverlib.Allowed(e, &authUsingState)
|
||||
}
|
||||
|
||||
func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event) error {
|
||||
// We are missing the previous events for this events.
|
||||
// This means that there is a gap in our view of the history of the
|
||||
// room. There two ways that we can handle such a gap:
|
||||
// 1) We can fill in the gap using /get_missing_events
|
||||
// 2) We can leave the gap and request the state of the room at
|
||||
// this event from the remote server using either /state_ids
|
||||
// or /state.
|
||||
// Synapse will attempt to do 1 and if that fails or if the gap is
|
||||
// too large then it will attempt 2.
|
||||
// Synapse will use /state_ids if possible since ususally the state
|
||||
// is largely unchanged and it is more efficient to fetch a list of
|
||||
// event ids and then use /event to fetch the individual events.
|
||||
// However not all version of synapse support /state_ids so you may
|
||||
// need to fallback to /state.
|
||||
// TODO: Attempt to fill in the gap using /get_missing_events
|
||||
// TODO: Attempt to fetch the state using /state_ids and /events
|
||||
state, err := t.federation.LookupState(t.Origin, e.RoomID(), e.EventID())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Check that the returned state is valid.
|
||||
if err := state.Check(t.keys); err != nil {
|
||||
return err
|
||||
}
|
||||
// Check that the event is allowed by the state.
|
||||
if err := checkAllowedByState(e, state.StateEvents); err != nil {
|
||||
return err
|
||||
}
|
||||
// pass the event along with the state to the roomserver
|
||||
if err := t.producer.SendEventWithState(state, e); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -28,7 +28,7 @@ type RoomEventDatabase interface {
|
|||
StoreEvent(event gomatrixserverlib.Event, authEventNIDs []types.EventNID) (types.RoomNID, types.StateAtEvent, error)
|
||||
// Lookup the state entries for a list of string event IDs
|
||||
// Returns an error if the there is an error talking to the database
|
||||
// or if the event IDs aren't in the database.
|
||||
// Returns a types.MissingEventError if the event IDs aren't in the database.
|
||||
StateEntriesForEventIDs(eventIDs []string) ([]types.StateEntry, error)
|
||||
// Set the state at an event.
|
||||
SetState(eventNID types.EventNID, stateNID types.StateSnapshotNID) error
|
||||
|
|
|
|||
|
|
@ -97,9 +97,12 @@ func (r *RoomserverQueryAPI) QueryStateAfterEvents(
|
|||
|
||||
prevStates, err := r.DB.StateAtEventIDs(request.PrevEventIDs)
|
||||
if err != nil {
|
||||
// TODO: Check if the error was because we are missing events from the
|
||||
// database or are missing state at events from the database.
|
||||
return err
|
||||
switch err.(type) {
|
||||
case types.MissingEventError:
|
||||
return nil
|
||||
default:
|
||||
return err
|
||||
}
|
||||
}
|
||||
response.PrevEventsExist = true
|
||||
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ type RoomStateDatabase interface {
|
|||
AddState(roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID, state []types.StateEntry) (types.StateSnapshotNID, error)
|
||||
// Lookup the state of a room at each event for a list of string event IDs.
|
||||
// Returns an error if there is an error talking to the database
|
||||
// or if the room state for the event IDs aren't in the database
|
||||
// Returns a types.MissingEventError if the room state for the event IDs aren't in the database
|
||||
StateAtEventIDs(eventIDs []string) ([]types.StateAtEvent, error)
|
||||
// Lookup the numeric IDs for a list of string event types.
|
||||
// Returns a map from string event type to numeric ID for the event type.
|
||||
|
|
|
|||
|
|
@ -166,6 +166,8 @@ func (s *eventStatements) selectEvent(eventID string) (types.EventNID, types.Sta
|
|||
return types.EventNID(eventNID), types.StateSnapshotNID(stateNID), err
|
||||
}
|
||||
|
||||
// bulkSelectStateEventByID lookups a list of state events by event ID.
|
||||
// If any of the requested events are missing from the database it returns a types.MissingEventError
|
||||
func (s *eventStatements) bulkSelectStateEventByID(eventIDs []string) ([]types.StateEntry, error) {
|
||||
rows, err := s.bulkSelectStateEventByIDStmt.Query(pq.StringArray(eventIDs))
|
||||
if err != nil {
|
||||
|
|
@ -194,11 +196,16 @@ func (s *eventStatements) bulkSelectStateEventByID(eventIDs []string) ([]types.S
|
|||
// However it should be possible debug this by replaying queries or entries from the input kafka logs.
|
||||
// If this turns out to be impossible and we do need the debug information here, it would be better
|
||||
// to do it as a separate query rather than slowing down/complicating the common case.
|
||||
return nil, fmt.Errorf("storage: state event IDs missing from the database (%d != %d)", i, len(eventIDs))
|
||||
return nil, types.MissingEventError(
|
||||
fmt.Sprintf("storage: state event IDs missing from the database (%d != %d)", i, len(eventIDs)),
|
||||
)
|
||||
}
|
||||
return results, err
|
||||
}
|
||||
|
||||
// bulkSelectStateAtEventByID lookups the state at a list of events by event ID.
|
||||
// If any of the requested events are missing from the database it returns a types.MissingEventError.
|
||||
// If we do not have the state for any of the requested events it returns a types.MissingEventError.
|
||||
func (s *eventStatements) bulkSelectStateAtEventByID(eventIDs []string) ([]types.StateAtEvent, error) {
|
||||
rows, err := s.bulkSelectStateAtEventByIDStmt.Query(pq.StringArray(eventIDs))
|
||||
if err != nil {
|
||||
|
|
@ -218,11 +225,15 @@ func (s *eventStatements) bulkSelectStateAtEventByID(eventIDs []string) ([]types
|
|||
return nil, err
|
||||
}
|
||||
if result.BeforeStateSnapshotNID == 0 {
|
||||
return nil, fmt.Errorf("storage: missing state for event NID %d", result.EventNID)
|
||||
return nil, types.MissingEventError(
|
||||
fmt.Sprintf("storage: missing state for event NID %d", result.EventNID),
|
||||
)
|
||||
}
|
||||
}
|
||||
if i != len(eventIDs) {
|
||||
return nil, fmt.Errorf("storage: event IDs missing from the database (%d != %d)", i, len(eventIDs))
|
||||
return nil, types.MissingEventError(
|
||||
fmt.Sprintf("storage: event IDs missing from the database (%d != %d)", i, len(eventIDs)),
|
||||
)
|
||||
}
|
||||
return results, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -168,3 +168,9 @@ type RoomRecentEventsUpdater interface {
|
|||
// Rollback the transaction.
|
||||
Rollback() error
|
||||
}
|
||||
|
||||
// A MissingEventError is an error that happened because the roomserver was
|
||||
// missing requested events from its database.
|
||||
type MissingEventError string
|
||||
|
||||
func (e MissingEventError) Error() string { return string(e) }
|
||||
|
|
|
|||
|
|
@ -20,6 +20,8 @@ import (
|
|||
|
||||
// Sync contains the config information necessary to spin up a sync-server process.
|
||||
type Sync struct {
|
||||
// Where the room server is listening for queries.
|
||||
RoomserverURL string `yaml:"roomserver_url"`
|
||||
// The topic for events which are written by the room server output log.
|
||||
RoomserverOutputTopic string `yaml:"roomserver_topic"`
|
||||
// A list of URIs to consume events from. These kafka logs should be produced by a Room Server.
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ package consumers
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
|
|
@ -33,6 +34,7 @@ type OutputRoomEvent struct {
|
|||
roomServerConsumer *common.ContinualConsumer
|
||||
db *storage.SyncServerDatabase
|
||||
notifier *sync.Notifier
|
||||
query api.RoomserverQueryAPI
|
||||
}
|
||||
|
||||
// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers.
|
||||
|
|
@ -51,6 +53,7 @@ func NewOutputRoomEvent(cfg *config.Sync, n *sync.Notifier, store *storage.SyncS
|
|||
roomServerConsumer: &consumer,
|
||||
db: store,
|
||||
notifier: n,
|
||||
query: api.NewRoomserverQueryAPIHTTP(cfg.RoomserverURL, nil),
|
||||
}
|
||||
consumer.ProcessMessage = s.onMessage
|
||||
|
||||
|
|
@ -84,7 +87,19 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
|
|||
"room_id": ev.RoomID(),
|
||||
}).Info("received event from roomserver")
|
||||
|
||||
syncStreamPos, err := s.db.WriteEvent(&ev, output.AddsStateEventIDs, output.RemovesStateEventIDs)
|
||||
addsStateEvents, err := s.lookupStateEvents(output.AddsStateEventIDs, ev)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"event": string(ev.JSON()),
|
||||
log.ErrorKey: err,
|
||||
"add": output.AddsStateEventIDs,
|
||||
"del": output.RemovesStateEventIDs,
|
||||
}).Panicf("roomserver output log: state event lookup failure")
|
||||
}
|
||||
|
||||
syncStreamPos, err := s.db.WriteEvent(
|
||||
&ev, addsStateEvents, output.AddsStateEventIDs, output.RemovesStateEventIDs,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
// panic rather than continue with an inconsistent database
|
||||
|
|
@ -100,3 +115,74 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
// lookupStateEvents looks up the state events that are added by a new event.
|
||||
func (s *OutputRoomEvent) lookupStateEvents(
|
||||
addsStateEventIDs []string, event gomatrixserverlib.Event,
|
||||
) ([]gomatrixserverlib.Event, error) {
|
||||
// Fast path if there aren't any new state events.
|
||||
if len(addsStateEventIDs) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Fast path if the only state event added is the event itself.
|
||||
if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() {
|
||||
return []gomatrixserverlib.Event{event}, nil
|
||||
}
|
||||
|
||||
// Check if this is re-adding a state events that we previously processed
|
||||
// If we have previously received a state event it may still be in
|
||||
// our event database.
|
||||
result, err := s.db.Events(addsStateEventIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
missing := missingEventsFrom(result, addsStateEventIDs)
|
||||
|
||||
// Check if event itself is being added.
|
||||
for _, eventID := range missing {
|
||||
if eventID == event.EventID() {
|
||||
result = append(result, event)
|
||||
break
|
||||
}
|
||||
}
|
||||
missing = missingEventsFrom(result, addsStateEventIDs)
|
||||
|
||||
if len(missing) == 0 {
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// At this point the missing events are neither the event itself nor are
|
||||
// they present in our local database. Our only option is to fetch them
|
||||
// from the roomserver using the query API.
|
||||
eventReq := api.QueryEventsByIDRequest{EventIDs: missing}
|
||||
var eventResp api.QueryEventsByIDResponse
|
||||
if err := s.query.QueryEventsByID(&eventReq, &eventResp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result = append(result, eventResp.Events...)
|
||||
missing = missingEventsFrom(result, addsStateEventIDs)
|
||||
|
||||
if len(missing) != 0 {
|
||||
return nil, fmt.Errorf(
|
||||
"missing %d state events IDs at event %q", len(missing), event.EventID(),
|
||||
)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func missingEventsFrom(events []gomatrixserverlib.Event, required []string) []string {
|
||||
have := map[string]bool{}
|
||||
for _, event := range events {
|
||||
have[event.EventID()] = true
|
||||
}
|
||||
var missing []string
|
||||
for _, eventID := range required {
|
||||
if !have[eventID] {
|
||||
missing = append(missing, eventID)
|
||||
}
|
||||
}
|
||||
return missing
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ package storage
|
|||
|
||||
import (
|
||||
"database/sql"
|
||||
"github.com/lib/pq"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
|
|
@ -35,6 +36,9 @@ CREATE TABLE IF NOT EXISTS current_room_state (
|
|||
-- The 'content.membership' value if this event is an m.room.member event. For other
|
||||
-- events, this will be NULL.
|
||||
membership TEXT,
|
||||
-- The serial ID of the output_room_events table when this event became
|
||||
-- part of the current state of the room.
|
||||
added_at BIGINT,
|
||||
-- Clobber based on 3-uple of room_id, type and state_key
|
||||
CONSTRAINT room_state_unique UNIQUE (room_id, type, state_key)
|
||||
);
|
||||
|
|
@ -45,9 +49,10 @@ CREATE INDEX IF NOT EXISTS membership_idx ON current_room_state(type, state_key,
|
|||
`
|
||||
|
||||
const upsertRoomStateSQL = "" +
|
||||
"INSERT INTO current_room_state (room_id, event_id, type, state_key, event_json, membership) VALUES ($1, $2, $3, $4, $5, $6)" +
|
||||
"INSERT INTO current_room_state (room_id, event_id, type, state_key, event_json, membership, added_at)" +
|
||||
" VALUES ($1, $2, $3, $4, $5, $6, $7)" +
|
||||
" ON CONFLICT ON CONSTRAINT room_state_unique" +
|
||||
" DO UPDATE SET event_id = $2, event_json = $5, membership = $6"
|
||||
" DO UPDATE SET event_id = $2, event_json = $5, membership = $6, added_at = $7"
|
||||
|
||||
const deleteRoomStateByEventIDSQL = "" +
|
||||
"DELETE FROM current_room_state WHERE event_id = $1"
|
||||
|
|
@ -61,12 +66,16 @@ const selectCurrentStateSQL = "" +
|
|||
const selectJoinedUsersSQL = "" +
|
||||
"SELECT room_id, state_key FROM current_room_state WHERE type = 'm.room.member' AND membership = 'join'"
|
||||
|
||||
const selectEventsWithEventIDsSQL = "" +
|
||||
"SELECT added_at, event_json FROM current_room_state WHERE event_id = ANY($1)"
|
||||
|
||||
type currentRoomStateStatements struct {
|
||||
upsertRoomStateStmt *sql.Stmt
|
||||
deleteRoomStateByEventIDStmt *sql.Stmt
|
||||
selectRoomIDsWithMembershipStmt *sql.Stmt
|
||||
selectCurrentStateStmt *sql.Stmt
|
||||
selectJoinedUsersStmt *sql.Stmt
|
||||
selectEventsWithEventIDsStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) {
|
||||
|
|
@ -89,6 +98,9 @@ func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) {
|
|||
if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.selectEventsWithEventIDsStmt, err = db.Prepare(selectEventsWithEventIDsSQL); err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -141,6 +153,33 @@ func (s *currentRoomStateStatements) selectCurrentState(txn *sql.Tx, roomID stri
|
|||
}
|
||||
defer rows.Close()
|
||||
|
||||
return rowsToEvents(rows)
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) deleteRoomStateByEventID(txn *sql.Tx, eventID string) error {
|
||||
_, err := txn.Stmt(s.deleteRoomStateByEventIDStmt).Exec(eventID)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) upsertRoomState(
|
||||
txn *sql.Tx, event gomatrixserverlib.Event, membership *string, addedAt int64,
|
||||
) error {
|
||||
_, err := txn.Stmt(s.upsertRoomStateStmt).Exec(
|
||||
event.RoomID(), event.EventID(), event.Type(), *event.StateKey(), event.JSON(), membership, addedAt,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) selectEventsWithEventIDs(txn *sql.Tx, eventIDs []string) ([]streamEvent, error) {
|
||||
rows, err := txn.Stmt(s.selectEventsWithEventIDsStmt).Query(pq.StringArray(eventIDs))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
return rowsToStreamEvents(rows)
|
||||
}
|
||||
|
||||
func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) {
|
||||
var result []gomatrixserverlib.Event
|
||||
for rows.Next() {
|
||||
var eventBytes []byte
|
||||
|
|
@ -156,15 +195,3 @@ func (s *currentRoomStateStatements) selectCurrentState(txn *sql.Tx, roomID stri
|
|||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) deleteRoomStateByEventID(txn *sql.Tx, eventID string) error {
|
||||
_, err := txn.Stmt(s.deleteRoomStateByEventIDStmt).Exec(eventID)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) upsertRoomState(txn *sql.Tx, event gomatrixserverlib.Event, membership *string) error {
|
||||
_, err := txn.Stmt(s.upsertRoomStateStmt).Exec(
|
||||
event.RoomID(), event.EventID(), event.Type(), *event.StateKey(), event.JSON(), membership,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,7 +16,6 @@ package storage
|
|||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/lib/pq"
|
||||
|
|
@ -193,7 +192,7 @@ func (s *outputRoomEventsStatements) selectRecentEvents(
|
|||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
events, err := rowsToEvents(rows)
|
||||
events, err := rowsToStreamEvents(rows)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -205,23 +204,19 @@ func (s *outputRoomEventsStatements) selectRecentEvents(
|
|||
// Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing
|
||||
// from the database.
|
||||
func (s *outputRoomEventsStatements) selectEvents(txn *sql.Tx, eventIDs []string) ([]streamEvent, error) {
|
||||
rows, err := txn.Stmt(s.selectEventsStmt).Query(pq.StringArray(eventIDs))
|
||||
stmt := s.selectEventsStmt
|
||||
if txn != nil {
|
||||
stmt = txn.Stmt(stmt)
|
||||
}
|
||||
rows, err := stmt.Query(pq.StringArray(eventIDs))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
result, err := rowsToEvents(rows)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(result) != len(eventIDs) {
|
||||
return nil, fmt.Errorf("failed to map all event IDs to events: (got %d, wanted %d)", len(result), len(eventIDs))
|
||||
}
|
||||
return result, nil
|
||||
return rowsToStreamEvents(rows)
|
||||
}
|
||||
|
||||
func rowsToEvents(rows *sql.Rows) ([]streamEvent, error) {
|
||||
func rowsToStreamEvents(rows *sql.Rows) ([]streamEvent, error) {
|
||||
var result []streamEvent
|
||||
for rows.Next() {
|
||||
var (
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ package storage
|
|||
import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
// Import the postgres database driver.
|
||||
_ "github.com/lib/pq"
|
||||
"github.com/matrix-org/dendrite/clientapi/events"
|
||||
|
|
@ -75,10 +76,24 @@ func (d *SyncServerDatabase) AllJoinedUsersInRooms() (map[string][]string, error
|
|||
return d.roomstate.selectJoinedUsers()
|
||||
}
|
||||
|
||||
// Events lookups a list of event by their event ID.
|
||||
// Returns a list of events matching the requested IDs found in the database.
|
||||
// If an event is not found in the database then it will be omitted from the list.
|
||||
// Returns an error if there was a problem talking with the database
|
||||
func (d *SyncServerDatabase) Events(eventIDs []string) ([]gomatrixserverlib.Event, error) {
|
||||
streamEvents, err := d.events.selectEvents(nil, eventIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return streamEventsToEvents(streamEvents), nil
|
||||
}
|
||||
|
||||
// WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races
|
||||
// when generating the stream position for this event. Returns the sync stream position for the inserted event.
|
||||
// Returns an error if there was a problem inserting this event.
|
||||
func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string) (streamPos types.StreamPosition, returnErr error) {
|
||||
func (d *SyncServerDatabase) WriteEvent(
|
||||
ev *gomatrixserverlib.Event, addStateEvents []gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string,
|
||||
) (streamPos types.StreamPosition, returnErr error) {
|
||||
returnErr = runTransaction(d.db, func(txn *sql.Tx) error {
|
||||
var err error
|
||||
pos, err := d.events.insertEvent(txn, ev, addStateEventIDs, removeStateEventIDs)
|
||||
|
|
@ -87,31 +102,19 @@ func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEve
|
|||
}
|
||||
streamPos = types.StreamPosition(pos)
|
||||
|
||||
if len(addStateEventIDs) == 0 && len(removeStateEventIDs) == 0 {
|
||||
if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 {
|
||||
// Nothing to do, the event may have just been a message event.
|
||||
return nil
|
||||
}
|
||||
|
||||
// Update the current room state based on the added/removed state event IDs.
|
||||
// In the common case there is a single added event ID which is the state event itself, assuming `ev` is a state event.
|
||||
// However, conflict resolution may result in there being different events being added, or even some removed.
|
||||
if len(removeStateEventIDs) == 0 && len(addStateEventIDs) == 1 && addStateEventIDs[0] == ev.EventID() {
|
||||
// common case
|
||||
return d.updateRoomState(txn, nil, []gomatrixserverlib.Event{*ev})
|
||||
}
|
||||
|
||||
// uncommon case: we need to fetch the full event for each event ID mentioned, then update room state
|
||||
added, err := d.events.selectEvents(txn, addStateEventIDs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return d.updateRoomState(txn, removeStateEventIDs, streamEventsToEvents(added))
|
||||
return d.updateRoomState(txn, removeStateEventIDs, addStateEvents, streamPos)
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func (d *SyncServerDatabase) updateRoomState(txn *sql.Tx, removedEventIDs []string, addedEvents []gomatrixserverlib.Event) error {
|
||||
func (d *SyncServerDatabase) updateRoomState(
|
||||
txn *sql.Tx, removedEventIDs []string, addedEvents []gomatrixserverlib.Event, streamPos types.StreamPosition,
|
||||
) error {
|
||||
// remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add.
|
||||
for _, eventID := range removedEventIDs {
|
||||
if err := d.roomstate.deleteRoomStateByEventID(txn, eventID); err != nil {
|
||||
|
|
@ -132,7 +135,7 @@ func (d *SyncServerDatabase) updateRoomState(txn *sql.Tx, removedEventIDs []stri
|
|||
}
|
||||
membership = &memberContent.Membership
|
||||
}
|
||||
if err := d.roomstate.upsertRoomState(txn, event, membership); err != nil {
|
||||
if err := d.roomstate.upsertRoomState(txn, event, membership, int64(streamPos)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
@ -310,7 +313,7 @@ func (d *SyncServerDatabase) fetchStateEvents(txn *sql.Tx, roomIDToEventIDSet ma
|
|||
for _, missingEvIDs := range missingEvents {
|
||||
allMissingEventIDs = append(allMissingEventIDs, missingEvIDs...)
|
||||
}
|
||||
evs, err := d.events.selectEvents(txn, allMissingEventIDs)
|
||||
evs, err := d.fetchMissingStateEvents(txn, allMissingEventIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -323,6 +326,45 @@ func (d *SyncServerDatabase) fetchStateEvents(txn *sql.Tx, roomIDToEventIDSet ma
|
|||
return stateBetween, nil
|
||||
}
|
||||
|
||||
func (d *SyncServerDatabase) fetchMissingStateEvents(txn *sql.Tx, eventIDs []string) ([]streamEvent, error) {
|
||||
// Fetch from the events table first so we pick up the stream ID for the
|
||||
// event.
|
||||
events, err := d.events.selectEvents(txn, eventIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
have := map[string]bool{}
|
||||
for _, event := range events {
|
||||
have[event.EventID()] = true
|
||||
}
|
||||
var missing []string
|
||||
for _, eventID := range eventIDs {
|
||||
if !have[eventID] {
|
||||
missing = append(missing, eventID)
|
||||
}
|
||||
}
|
||||
if len(missing) == 0 {
|
||||
return events, nil
|
||||
}
|
||||
|
||||
// If they are missing from the events table then they should be state
|
||||
// events that we received from outside the main event stream.
|
||||
// These should be in the room state table.
|
||||
stateEvents, err := d.roomstate.selectEventsWithEventIDs(txn, missing)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(stateEvents) != len(missing) {
|
||||
return nil, fmt.Errorf("failed to map all event IDs to events: (got %d, wanted %d)", len(stateEvents), len(missing))
|
||||
}
|
||||
for _, e := range stateEvents {
|
||||
events = append(events, e)
|
||||
}
|
||||
return events, nil
|
||||
}
|
||||
|
||||
func (d *SyncServerDatabase) getStateDeltas(txn *sql.Tx, fromPos, toPos types.StreamPosition, userID string) ([]stateDelta, error) {
|
||||
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
|
||||
// - Get membership list changes for this user in this sync response
|
||||
|
|
|
|||
Loading…
Reference in a new issue