mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-02-25 14:03:09 -06:00
Merge branch 'master' into rob/media-upload
This commit is contained in:
commit
da7feaf741
|
|
@ -26,6 +26,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/syncapi/routing"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
"github.com/matrix-org/dendrite/syncapi/sync"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
yaml "gopkg.in/yaml.v2"
|
||||
|
|
@ -71,12 +72,13 @@ func main() {
|
|||
log.Panicf("startup: failed to create sync server database with data source %s : %s", cfg.DataSource, err)
|
||||
}
|
||||
|
||||
rp, err := sync.NewRequestPool(db)
|
||||
pos, err := db.SyncStreamPosition()
|
||||
if err != nil {
|
||||
log.Panicf("startup: Failed to create request pool : %s", err)
|
||||
log.Panicf("startup: failed to get latest sync stream position : %s", err)
|
||||
}
|
||||
|
||||
server, err := consumers.NewServer(cfg, rp, db)
|
||||
n := sync.NewNotifier(types.StreamPosition(pos))
|
||||
server, err := consumers.NewServer(cfg, n, db)
|
||||
if err != nil {
|
||||
log.Panicf("startup: failed to create sync server: %s", err)
|
||||
}
|
||||
|
|
@ -85,6 +87,6 @@ func main() {
|
|||
}
|
||||
|
||||
log.Info("Starting sync server on ", *bindAddr)
|
||||
routing.SetupSyncServerListeners(http.DefaultServeMux, http.DefaultClient, *cfg, rp)
|
||||
routing.SetupSyncServerListeners(http.DefaultServeMux, http.DefaultClient, *cfg, sync.NewRequestPool(db, n))
|
||||
log.Fatal(http.ListenAndServe(*bindAddr, nil))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -39,7 +39,7 @@ func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg config.MediaAPI,
|
|||
|
||||
r0mux.Handle("/download/{serverName}/{mediaId}",
|
||||
prometheus.InstrumentHandler("download", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
util.SetupRequestLogging(req)
|
||||
req = util.RequestWithLogging(req)
|
||||
|
||||
// Set common headers returned regardless of the outcome of the request
|
||||
util.SetCORSHeaders(w)
|
||||
|
|
|
|||
|
|
@ -32,11 +32,11 @@ import (
|
|||
type Server struct {
|
||||
roomServerConsumer *common.ContinualConsumer
|
||||
db *storage.SyncServerDatabase
|
||||
rp *sync.RequestPool
|
||||
notifier *sync.Notifier
|
||||
}
|
||||
|
||||
// NewServer creates a new sync server. Call Start() to begin consuming from room servers.
|
||||
func NewServer(cfg *config.Sync, rp *sync.RequestPool, store *storage.SyncServerDatabase) (*Server, error) {
|
||||
func NewServer(cfg *config.Sync, n *sync.Notifier, store *storage.SyncServerDatabase) (*Server, error) {
|
||||
kafkaConsumer, err := sarama.NewConsumer(cfg.KafkaConsumerURIs, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -50,7 +50,7 @@ func NewServer(cfg *config.Sync, rp *sync.RequestPool, store *storage.SyncServer
|
|||
s := &Server{
|
||||
roomServerConsumer: &consumer,
|
||||
db: store,
|
||||
rp: rp,
|
||||
notifier: n,
|
||||
}
|
||||
consumer.ProcessMessage = s.onMessage
|
||||
|
||||
|
|
@ -96,7 +96,7 @@ func (s *Server) onMessage(msg *sarama.ConsumerMessage) error {
|
|||
}).Panicf("roomserver output log: write event failure")
|
||||
return nil
|
||||
}
|
||||
s.rp.OnNewEvent(&ev, types.StreamPosition(syncStreamPos))
|
||||
s.notifier.OnNewEvent(&ev, types.StreamPosition(syncStreamPos))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
67
src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go
Normal file
67
src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go
Normal file
|
|
@ -0,0 +1,67 @@
|
|||
// Copyright 2017 Vector Creations Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package sync
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
// Notifier will wake up sleeping requests in the request pool when there
|
||||
// is some new data. It does not tell requests what that data is, only the
|
||||
// stream position which they can use to get at it.
|
||||
type Notifier struct {
|
||||
// The latest sync stream position: guarded by 'cond'.
|
||||
currPos types.StreamPosition
|
||||
// A condition variable to notify all waiting goroutines of a new sync stream position
|
||||
cond *sync.Cond
|
||||
}
|
||||
|
||||
// NewNotifier creates a new notifier set to the given stream position.
|
||||
func NewNotifier(pos types.StreamPosition) *Notifier {
|
||||
return &Notifier{
|
||||
pos,
|
||||
sync.NewCond(&sync.Mutex{}),
|
||||
}
|
||||
}
|
||||
|
||||
// OnNewEvent is called when a new event is received from the room server. Must only be
|
||||
// called from a single goroutine, to avoid races between updates which could set the
|
||||
// current position in the stream incorrectly.
|
||||
func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, pos types.StreamPosition) {
|
||||
// update the current position in a guard and then notify all /sync streams
|
||||
n.cond.L.Lock()
|
||||
n.currPos = pos
|
||||
n.cond.L.Unlock()
|
||||
|
||||
n.cond.Broadcast() // notify ALL waiting goroutines
|
||||
}
|
||||
|
||||
// WaitForEvents blocks until there are new events for this request.
|
||||
func (n *Notifier) WaitForEvents(req syncRequest) types.StreamPosition {
|
||||
// In a guard, check if the /sync request should block, and block it until we get a new position
|
||||
n.cond.L.Lock()
|
||||
currentPos := n.currPos
|
||||
for req.since == currentPos {
|
||||
// we need to wait for a new event.
|
||||
// TODO: This waits for ANY new event, we need to only wait for events which we care about.
|
||||
n.cond.Wait() // atomically unlocks and blocks goroutine, then re-acquires lock on unblock
|
||||
currentPos = n.currPos
|
||||
}
|
||||
n.cond.L.Unlock()
|
||||
return currentPos
|
||||
}
|
||||
|
|
@ -16,7 +16,6 @@ package sync
|
|||
|
||||
import (
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
|
|
@ -31,20 +30,13 @@ import (
|
|||
|
||||
// RequestPool manages HTTP long-poll connections for /sync
|
||||
type RequestPool struct {
|
||||
db *storage.SyncServerDatabase
|
||||
// The latest sync stream position: guarded by 'cond'.
|
||||
currPos types.StreamPosition
|
||||
// A condition variable to notify all waiting goroutines of a new sync stream position
|
||||
cond *sync.Cond
|
||||
db *storage.SyncServerDatabase
|
||||
notifier *Notifier
|
||||
}
|
||||
|
||||
// NewRequestPool makes a new RequestPool
|
||||
func NewRequestPool(db *storage.SyncServerDatabase) (*RequestPool, error) {
|
||||
pos, err := db.SyncStreamPosition()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &RequestPool{db, types.StreamPosition(pos), sync.NewCond(&sync.Mutex{})}, nil
|
||||
func NewRequestPool(db *storage.SyncServerDatabase, n *Notifier) *RequestPool {
|
||||
return &RequestPool{db, n}
|
||||
}
|
||||
|
||||
// OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be
|
||||
|
|
@ -106,34 +98,8 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons
|
|||
}
|
||||
}
|
||||
|
||||
// OnNewEvent is called when a new event is received from the room server. Must only be
|
||||
// called from a single goroutine, to avoid races between updates which could set the
|
||||
// current position in the stream incorrectly.
|
||||
func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, pos types.StreamPosition) {
|
||||
// update the current position in a guard and then notify all /sync streams
|
||||
rp.cond.L.Lock()
|
||||
rp.currPos = pos
|
||||
rp.cond.L.Unlock()
|
||||
|
||||
rp.cond.Broadcast() // notify ALL waiting goroutines
|
||||
}
|
||||
|
||||
func (rp *RequestPool) waitForEvents(req syncRequest) types.StreamPosition {
|
||||
// In a guard, check if the /sync request should block, and block it until we get a new position
|
||||
rp.cond.L.Lock()
|
||||
currentPos := rp.currPos
|
||||
for req.since == currentPos {
|
||||
// we need to wait for a new event.
|
||||
// TODO: This waits for ANY new event, we need to only wait for events which we care about.
|
||||
rp.cond.Wait() // atomically unlocks and blocks goroutine, then re-acquires lock on unblock
|
||||
currentPos = rp.currPos
|
||||
}
|
||||
rp.cond.L.Unlock()
|
||||
return currentPos
|
||||
}
|
||||
|
||||
func (rp *RequestPool) currentSyncForUser(req syncRequest) (*types.Response, error) {
|
||||
currentPos := rp.waitForEvents(req)
|
||||
currentPos := rp.notifier.WaitForEvents(req)
|
||||
|
||||
if req.since == types.StreamPosition(0) {
|
||||
pos, data, err := rp.db.CompleteSync(req.userID, req.limit)
|
||||
|
|
|
|||
4
vendor/manifest
vendored
4
vendor/manifest
vendored
|
|
@ -98,7 +98,7 @@
|
|||
{
|
||||
"importpath": "github.com/matrix-org/util",
|
||||
"repository": "https://github.com/matrix-org/util",
|
||||
"revision": "8b11d9882e131d58ff40525c8f7b9a7f4b811a43",
|
||||
"revision": "53326ed5598b226681112cbd441f59f3cffc9c82",
|
||||
"branch": "master"
|
||||
},
|
||||
{
|
||||
|
|
@ -212,4 +212,4 @@
|
|||
"branch": "v2"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -93,10 +93,10 @@ func Protect(handler http.HandlerFunc) http.HandlerFunc {
|
|||
}
|
||||
}
|
||||
|
||||
// SetupRequestLogging sets up standard logging for http.Requests.
|
||||
// RequestWithLogging sets up standard logging for http.Requests.
|
||||
// http.Requests will have a logger (with a request ID/method/path logged) attached to the Context.
|
||||
// This can be accessed via GetLogger(Context).
|
||||
func SetupRequestLogging(req *http.Request) {
|
||||
func RequestWithLogging(req *http.Request) *http.Request {
|
||||
reqID := RandomString(12)
|
||||
// Set a Logger and request ID on the context
|
||||
ctx := context.WithValue(req.Context(), ctxValueLogger, log.WithFields(log.Fields{
|
||||
|
|
@ -109,6 +109,8 @@ func SetupRequestLogging(req *http.Request) {
|
|||
|
||||
logger := GetLogger(req.Context())
|
||||
logger.Print("Incoming request")
|
||||
|
||||
return req
|
||||
}
|
||||
|
||||
// MakeJSONAPI creates an HTTP handler which always responds to incoming requests with JSON responses.
|
||||
|
|
@ -116,7 +118,7 @@ func SetupRequestLogging(req *http.Request) {
|
|||
// This can be accessed via GetLogger(Context).
|
||||
func MakeJSONAPI(handler JSONRequestHandler) http.HandlerFunc {
|
||||
return Protect(func(w http.ResponseWriter, req *http.Request) {
|
||||
SetupRequestLogging(req)
|
||||
req = RequestWithLogging(req)
|
||||
|
||||
if req.Method == "OPTIONS" {
|
||||
SetCORSHeaders(w)
|
||||
|
|
|
|||
Loading…
Reference in a new issue