mediaapi/writers/download: Try to only request remote files once

If multiple requests arrive for the same remote file, we want to
download them once and then serve to all the remaining incoming requests
from the cache.

The main thing missing from the code at this point is a mechanism to
time out database queries. They are made across a network and so we
should be robust to network connectivity issues. This is a general
problem across dendrite and not limited to just this code.
This commit is contained in:
Robert Swain 2017-05-10 16:04:39 +02:00
parent b280722591
commit 49ec095b59
3 changed files with 81 additions and 17 deletions

View file

@ -16,6 +16,7 @@ package routing
import ( import (
"net/http" "net/http"
"sync"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/matrix-org/dendrite/mediaapi/config" "github.com/matrix-org/dendrite/mediaapi/config"
@ -37,6 +38,9 @@ func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg config.MediaAPI,
return writers.Upload(req, cfg, db) return writers.Upload(req, cfg, db)
}))) })))
activeRemoteRequests := &types.ActiveRemoteRequests{
Set: map[string]*sync.Cond{},
}
r0mux.Handle("/download/{serverName}/{mediaId}", r0mux.Handle("/download/{serverName}/{mediaId}",
prometheus.InstrumentHandler("download", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { prometheus.InstrumentHandler("download", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
req = util.RequestWithLogging(req) req = util.RequestWithLogging(req)
@ -47,7 +51,7 @@ func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg config.MediaAPI,
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
vars := mux.Vars(req) vars := mux.Vars(req)
writers.Download(w, req, types.ServerName(vars["serverName"]), types.MediaID(vars["mediaId"]), cfg, db) writers.Download(w, req, types.ServerName(vars["serverName"]), types.MediaID(vars["mediaId"]), cfg, db, activeRemoteRequests)
})), })),
) )

View file

@ -14,6 +14,8 @@
package types package types
import "sync"
// ContentDisposition is an HTTP Content-Disposition header string // ContentDisposition is an HTTP Content-Disposition header string
type ContentDisposition string type ContentDisposition string
@ -55,3 +57,10 @@ type MediaMetadata struct {
UploadName Filename UploadName Filename
UserID MatrixUserID UserID MatrixUserID
} }
// ActiveRemoteRequests is a lockable map of media URIs requested from remote homeservers
// It is used for ensuring multiple requests for the same file do not clobber each other.
type ActiveRemoteRequests struct {
sync.Mutex
Set map[string]*sync.Cond
}

View file

@ -25,6 +25,7 @@ import (
"path" "path"
"strconv" "strconv"
"strings" "strings"
"sync"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
@ -80,13 +81,15 @@ var errRead = fmt.Errorf("failed to read response from remote server")
var errResponse = fmt.Errorf("failed to write file data to response body") var errResponse = fmt.Errorf("failed to write file data to response body")
var errWrite = fmt.Errorf("failed to write file to disk") var errWrite = fmt.Errorf("failed to write file to disk")
var nAttempts = 5
// Download implements /download // Download implements /download
// Files from this server (i.e. origin == cfg.ServerName) are served directly // Files from this server (i.e. origin == cfg.ServerName) are served directly
// Files from remote servers (i.e. origin != cfg.ServerName) are cached locally. // Files from remote servers (i.e. origin != cfg.ServerName) are cached locally.
// If they are present in the cache, they are served directly. // If they are present in the cache, they are served directly.
// If they are not present in the cache, they are obtained from the remote server and // If they are not present in the cache, they are obtained from the remote server and
// simultaneously served back to the client and written into the cache. // simultaneously served back to the client and written into the cache.
func Download(w http.ResponseWriter, req *http.Request, origin types.ServerName, mediaID types.MediaID, cfg config.MediaAPI, db *storage.Database) { func Download(w http.ResponseWriter, req *http.Request, origin types.ServerName, mediaID types.MediaID, cfg config.MediaAPI, db *storage.Database, activeRemoteRequests *types.ActiveRemoteRequests) {
logger := util.GetLogger(req.Context()) logger := util.GetLogger(req.Context())
// request validation // request validation
@ -124,7 +127,38 @@ func Download(w http.ResponseWriter, req *http.Request, origin types.ServerName,
"Origin": r.MediaMetadata.Origin, "Origin": r.MediaMetadata.Origin,
}).Infof("Fetching remote file") }).Infof("Fetching remote file")
// TODO: lock request in hash set mxcURL := "mxc://" + string(r.MediaMetadata.Origin) + "/" + string(r.MediaMetadata.MediaID)
for attempts := 0; ; attempts++ {
activeRemoteRequests.Lock()
err = db.GetMediaMetadata(r.MediaMetadata.MediaID, r.MediaMetadata.Origin, r.MediaMetadata)
if err == nil {
// If we have a record, we can respond from the local file
respondFromLocalFile(w, logger, r.MediaMetadata, cfg)
activeRemoteRequests.Unlock()
return
}
if activeRemoteRequestCondition, ok := activeRemoteRequests.Set[mxcURL]; ok {
if attempts >= nAttempts {
logger.Warnf("Other goroutines are trying to download the remote file and failing.")
jsonErrorResponse(w, util.JSONResponse{
Code: 500,
JSON: jsonerror.Unknown(fmt.Sprintf("File with media ID %q could not be downloaded from %q", r.MediaMetadata.MediaID, r.MediaMetadata.Origin)),
}, logger)
return
}
logger.WithFields(log.Fields{
"Origin": r.MediaMetadata.Origin,
"MediaID": r.MediaMetadata.MediaID,
}).Infof("Waiting for another goroutine to fetch the file.")
activeRemoteRequestCondition.Wait()
activeRemoteRequests.Unlock()
} else {
activeRemoteRequests.Set[mxcURL] = &sync.Cond{L: activeRemoteRequests}
activeRemoteRequests.Unlock()
break
}
}
// FIXME: Only request once (would race if multiple requests for the same remote file) // FIXME: Only request once (would race if multiple requests for the same remote file)
// Use a hash set based on the origin and media ID (the request URL should be fine...) and synchronise adding / removing members // Use a hash set based on the origin and media ID (the request URL should be fine...) and synchronise adding / removing members
@ -319,20 +353,7 @@ func Download(w http.ResponseWriter, req *http.Request, origin types.ServerName,
"Content-Disposition": r.MediaMetadata.ContentDisposition, "Content-Disposition": r.MediaMetadata.ContentDisposition,
}).Infof("Storing file metadata to media repository database") }).Infof("Storing file metadata to media repository database")
// if written to disk, add to db // The database is the source of truth so we need to have moved the file first
err = db.StoreMediaMetadata(r.MediaMetadata)
if err != nil {
tmpDirErr := os.RemoveAll(string(tmpDir))
if tmpDirErr != nil {
logger.Warnf("Failed to remove tmpDir (%v): %q\n", tmpDir, tmpDirErr)
}
return
}
// TODO: unlock request in hash set
// TODO: generate thumbnails
err = moveFile( err = moveFile(
types.Path(path.Join(string(tmpDir), "content")), types.Path(path.Join(string(tmpDir), "content")),
types.Path(getPathFromMediaMetadata(r.MediaMetadata, cfg.BasePath)), types.Path(getPathFromMediaMetadata(r.MediaMetadata, cfg.BasePath)),
@ -345,6 +366,36 @@ func Download(w http.ResponseWriter, req *http.Request, origin types.ServerName,
return return
} }
// Writing the metadata to the media repository database and removing the mxcURL from activeRemoteRequests needs to be atomic.
// If it were not atomic, a new request for the same file could come in in routine A and check the database before the INSERT.
// Routine B which was fetching could then have its INSERT complete and remove the mxcURL from the activeRemoteRequests.
// If routine A then checked the activeRemoteRequests it would think it needed to fetch the file when it's already in the database.
// The locking below mitigates this situation.
activeRemoteRequests.Lock()
// FIXME: unlock after timeout of db request
// if written to disk, add to db
err = db.StoreMediaMetadata(r.MediaMetadata)
if err != nil {
finalDir := path.Dir(getPathFromMediaMetadata(r.MediaMetadata, cfg.BasePath))
finalDirErr := os.RemoveAll(finalDir)
if finalDirErr != nil {
logger.Warnf("Failed to remove finalDir (%v): %q\n", finalDir, finalDirErr)
}
delete(activeRemoteRequests.Set, mxcURL)
activeRemoteRequests.Unlock()
return
}
activeRemoteRequestCondition, _ := activeRemoteRequests.Set[mxcURL]
logger.WithFields(log.Fields{
"Origin": r.MediaMetadata.Origin,
"MediaID": r.MediaMetadata.MediaID,
}).Infof("Signalling other goroutines waiting for us to fetch the file.")
activeRemoteRequestCondition.Broadcast()
delete(activeRemoteRequests.Set, mxcURL)
activeRemoteRequests.Unlock()
// TODO: generate thumbnails
logger.WithFields(log.Fields{ logger.WithFields(log.Fields{
"MediaID": r.MediaMetadata.MediaID, "MediaID": r.MediaMetadata.MediaID,
"Origin": r.MediaMetadata.Origin, "Origin": r.MediaMetadata.Origin,