mediaapi/fileutils: Move fileutils to its own package

This commit is contained in:
Robert Swain 2017-05-22 14:12:11 +02:00
parent 7bdcece102
commit 8684f80ebd
3 changed files with 32 additions and 28 deletions

View file

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package writers package fileutils
import ( import (
"bufio" "bufio"
@ -32,7 +32,8 @@ import (
"github.com/matrix-org/dendrite/mediaapi/types" "github.com/matrix-org/dendrite/mediaapi/types"
) )
func removeDir(dir types.Path, logger *log.Entry) { // RemoveDir removes a directory and logs a warning in case of errors
func RemoveDir(dir types.Path, logger *log.Entry) {
dirErr := os.RemoveAll(string(dir)) dirErr := os.RemoveAll(string(dir))
if dirErr != nil { if dirErr != nil {
logger.WithError(dirErr).WithField("dir", dir).Warn("Failed to remove directory") logger.WithError(dirErr).WithField("dir", dir).Warn("Failed to remove directory")
@ -77,7 +78,8 @@ func createTempFileWriter(absBasePath types.Path) (*bufio.Writer, *os.File, type
} }
var ( var (
errFileIsTooLarge = fmt.Errorf("file is too large") // ErrFileIsTooLarge indicates that the uploaded file is larger than the configured maximum file size
ErrFileIsTooLarge = fmt.Errorf("file is too large")
errRead = fmt.Errorf("failed to read response from remote server") errRead = fmt.Errorf("failed to read response from remote server")
errResponse = fmt.Errorf("failed to write file data to response body") errResponse = fmt.Errorf("failed to write file data to response body")
errHash = fmt.Errorf("failed to hash file data") errHash = fmt.Errorf("failed to hash file data")
@ -104,7 +106,7 @@ func writeToResponse(respWriter http.ResponseWriter, buffer []byte, bytesToWrite
func writeToDiskAndHasher(tmpFileWriter *bufio.Writer, hasher hash.Hash, bytesWritten int64, maxFileSizeBytes types.ContentLength, buffer []byte, bytesToWrite int) (int64, error) { func writeToDiskAndHasher(tmpFileWriter *bufio.Writer, hasher hash.Hash, bytesWritten int64, maxFileSizeBytes types.ContentLength, buffer []byte, bytesToWrite int) (int64, error) {
// if larger than maxFileSizeBytes then stop writing to disk and discard cached file // if larger than maxFileSizeBytes then stop writing to disk and discard cached file
if bytesWritten+int64(bytesToWrite) > int64(maxFileSizeBytes) { if bytesWritten+int64(bytesToWrite) > int64(maxFileSizeBytes) {
return 0, errFileIsTooLarge return 0, ErrFileIsTooLarge
} }
// write to hasher and to disk // write to hasher and to disk
bytesTemp, writeErr := tmpFileWriter.Write(buffer[:bytesToWrite]) bytesTemp, writeErr := tmpFileWriter.Write(buffer[:bytesToWrite])
@ -117,7 +119,7 @@ func writeToDiskAndHasher(tmpFileWriter *bufio.Writer, hasher hash.Hash, bytesWr
return int64(bytesTemp), nil return int64(bytesTemp), nil
} }
// readAndHashAndWriteWithLimit works like io.Copy except it copies from the reqReader to the // ReadAndHashAndWriteWithLimit works like io.Copy except it copies from the reqReader to the
// optionally-supplied respWriter and a temporary file named 'content' using a bufio.Writer. // optionally-supplied respWriter and a temporary file named 'content' using a bufio.Writer.
// The data written to disk is hashed using the SHA-256 algorithm. // The data written to disk is hashed using the SHA-256 algorithm.
// If there is an error with the reqReader or the respWriter, that is considered an error. // If there is an error with the reqReader or the respWriter, that is considered an error.
@ -127,7 +129,7 @@ func writeToDiskAndHasher(tmpFileWriter *bufio.Writer, hasher hash.Hash, bytesWr
// If a respWriter is provided, all the data will be proxied from the reqReader to // If a respWriter is provided, all the data will be proxied from the reqReader to
// the respWriter, regardless of errors or limits on writing to disk. // the respWriter, regardless of errors or limits on writing to disk.
// Returns all of the hash sum, bytes written to disk, and temporary directory path, or an error. // Returns all of the hash sum, bytes written to disk, and temporary directory path, or an error.
func readAndHashAndWriteWithLimit(reqReader io.Reader, maxFileSizeBytes types.ContentLength, absBasePath types.Path, respWriter http.ResponseWriter) (types.Base64Hash, types.ContentLength, types.ContentLength, types.Path, error) { func ReadAndHashAndWriteWithLimit(reqReader io.Reader, maxFileSizeBytes types.ContentLength, absBasePath types.Path, respWriter http.ResponseWriter) (types.Base64Hash, types.ContentLength, types.ContentLength, types.Path, error) {
// create the temporary file writer // create the temporary file writer
tmpFileWriter, tmpFile, tmpDir, err := createTempFileWriter(absBasePath) tmpFileWriter, tmpFile, tmpDir, err := createTempFileWriter(absBasePath)
if err != nil { if err != nil {
@ -157,7 +159,7 @@ func readAndHashAndWriteWithLimit(reqReader io.Reader, maxFileSizeBytes types.Co
bytesTemp, copyError = writeToResponse(respWriter, buffer, bytesRead) bytesTemp, copyError = writeToResponse(respWriter, buffer, bytesRead)
bytesResponded += bytesTemp bytesResponded += bytesTemp
if copyError == nil { if copyError == nil {
// Note: if we get here then copyError != errFileIsTooLarge && copyError != errWrite // Note: if we get here then copyError != ErrFileIsTooLarge && copyError != errWrite
// as if copyError == errResponse || copyError == errWrite then we would have broken // as if copyError == errResponse || copyError == errWrite then we would have broken
// out of the loop and there are no other cases // out of the loop and there are no other cases
bytesTemp, copyError = writeToDiskAndHasher(tmpFileWriter, hasher, bytesWritten, maxFileSizeBytes, buffer, (bytesRead)) bytesTemp, copyError = writeToDiskAndHasher(tmpFileWriter, hasher, bytesWritten, maxFileSizeBytes, buffer, (bytesRead))
@ -186,7 +188,7 @@ func readAndHashAndWriteWithLimit(reqReader io.Reader, maxFileSizeBytes types.Co
return types.Base64Hash(base64.URLEncoding.EncodeToString(hash[:])), types.ContentLength(bytesResponded), types.ContentLength(bytesWritten), tmpDir, nil return types.Base64Hash(base64.URLEncoding.EncodeToString(hash[:])), types.ContentLength(bytesResponded), types.ContentLength(bytesWritten), tmpDir, nil
} }
// getPathFromMediaMetadata validates and constructs the on-disk path to the media // GetPathFromMediaMetadata validates and constructs the on-disk path to the media
// based on its origin and mediaID // based on its origin and mediaID
// If a mediaID is too short, which could happen for other homeserver implementations, // If a mediaID is too short, which could happen for other homeserver implementations,
// place it into a short-id subdirectory of the origin directory // place it into a short-id subdirectory of the origin directory
@ -195,7 +197,7 @@ func readAndHashAndWriteWithLimit(reqReader io.Reader, maxFileSizeBytes types.Co
// mediaID is 'qwerty', we create subdirectories 'q', 'w' within 'q' and place the file // mediaID is 'qwerty', we create subdirectories 'q', 'w' within 'q' and place the file
// in 'q/w' calling it 'erty'. If the mediaID is shorter than 3 characters, the last // in 'q/w' calling it 'erty'. If the mediaID is shorter than 3 characters, the last
// character is the file name and the preceding character, if any, is a subdirectory name. // character is the file name and the preceding character, if any, is a subdirectory name.
func getPathFromMediaMetadata(m *types.MediaMetadata, absBasePath types.Path) (string, error) { func GetPathFromMediaMetadata(m *types.MediaMetadata, absBasePath types.Path) (string, error) {
var subPath, fileName string var subPath, fileName string
hashLen := len(m.Base64Hash) hashLen := len(m.Base64Hash)
@ -253,16 +255,16 @@ func moveFile(src types.Path, dst types.Path) error {
return nil return nil
} }
// moveFileWithHashCheck attempts to move the file src to dst and checks for hash collisions based on metadata // MoveFileWithHashCheck attempts to move the file src to dst and checks for hash collisions based on metadata
// Check if destination file exists. As the destination is based on a hash of the file data, // Check if destination file exists. As the destination is based on a hash of the file data,
// if it exists and the content length does not match then there is a hash collision for two different files. If // if it exists and the content length does not match then there is a hash collision for two different files. If
// it exists and the content length matches, it is believable that it is the same file and we can just // it exists and the content length matches, it is believable that it is the same file and we can just
// discard the temporary file. // discard the temporary file.
func moveFileWithHashCheck(tmpDir types.Path, mediaMetadata *types.MediaMetadata, absBasePath types.Path, logger *log.Entry) (string, bool, error) { func MoveFileWithHashCheck(tmpDir types.Path, mediaMetadata *types.MediaMetadata, absBasePath types.Path, logger *log.Entry) (string, bool, error) {
duplicate := false duplicate := false
finalPath, err := getPathFromMediaMetadata(mediaMetadata, absBasePath) finalPath, err := GetPathFromMediaMetadata(mediaMetadata, absBasePath)
if err != nil { if err != nil {
removeDir(tmpDir, logger) RemoveDir(tmpDir, logger)
return "", duplicate, fmt.Errorf("failed to get file path from metadata: %q", err) return "", duplicate, fmt.Errorf("failed to get file path from metadata: %q", err)
} }
@ -270,11 +272,11 @@ func moveFileWithHashCheck(tmpDir types.Path, mediaMetadata *types.MediaMetadata
if stat, err = os.Stat(finalPath); os.IsExist(err) { if stat, err = os.Stat(finalPath); os.IsExist(err) {
duplicate = true duplicate = true
if stat.Size() == int64(mediaMetadata.ContentLength) { if stat.Size() == int64(mediaMetadata.ContentLength) {
removeDir(tmpDir, logger) RemoveDir(tmpDir, logger)
return finalPath, duplicate, nil return finalPath, duplicate, nil
} }
// Remove the tmpDir as we anyway cannot cache the file on disk due to the hash collision // Remove the tmpDir as we anyway cannot cache the file on disk due to the hash collision
removeDir(tmpDir, logger) RemoveDir(tmpDir, logger)
return "", duplicate, fmt.Errorf("downloaded file with hash collision but different file size (%v)", finalPath) return "", duplicate, fmt.Errorf("downloaded file with hash collision but different file size (%v)", finalPath)
} }
err = moveFile( err = moveFile(
@ -282,7 +284,7 @@ func moveFileWithHashCheck(tmpDir types.Path, mediaMetadata *types.MediaMetadata
types.Path(finalPath), types.Path(finalPath),
) )
if err != nil { if err != nil {
removeDir(tmpDir, logger) RemoveDir(tmpDir, logger)
return "", duplicate, fmt.Errorf("failed to move file to final destination (%v): %q", finalPath, err) return "", duplicate, fmt.Errorf("failed to move file to final destination (%v): %q", finalPath, err)
} }
return finalPath, duplicate, nil return finalPath, duplicate, nil

View file

@ -30,6 +30,7 @@ import (
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/mediaapi/config" "github.com/matrix-org/dendrite/mediaapi/config"
"github.com/matrix-org/dendrite/mediaapi/fileutils"
"github.com/matrix-org/dendrite/mediaapi/storage" "github.com/matrix-org/dendrite/mediaapi/storage"
"github.com/matrix-org/dendrite/mediaapi/types" "github.com/matrix-org/dendrite/mediaapi/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -183,7 +184,7 @@ func (r *downloadRequest) respondFromLocalFile(w http.ResponseWriter, absBasePat
"Content-Disposition": r.MediaMetadata.ContentDisposition, "Content-Disposition": r.MediaMetadata.ContentDisposition,
}).Infof("Downloading file") }).Infof("Downloading file")
filePath, err := getPathFromMediaMetadata(r.MediaMetadata, absBasePath) filePath, err := fileutils.GetPathFromMediaMetadata(r.MediaMetadata, absBasePath)
if err != nil { if err != nil {
// FIXME: Remove erroneous file from database? // FIXME: Remove erroneous file from database?
r.Logger.WithError(err).Warn("Failed to get file path from metadata") r.Logger.WithError(err).Warn("Failed to get file path from metadata")
@ -341,7 +342,7 @@ func (r *downloadRequest) commitFileAndMetadata(tmpDir types.Path, absBasePath t
}).Info("Storing file metadata to media repository database") }).Info("Storing file metadata to media repository database")
// The database is the source of truth so we need to have moved the file first // The database is the source of truth so we need to have moved the file first
finalPath, duplicate, err := moveFileWithHashCheck(tmpDir, r.MediaMetadata, absBasePath, r.Logger) finalPath, duplicate, err := fileutils.MoveFileWithHashCheck(tmpDir, r.MediaMetadata, absBasePath, r.Logger)
if err != nil { if err != nil {
r.Logger.WithError(err).Error("Failed to move file.") r.Logger.WithError(err).Error("Failed to move file.")
return updateActiveRemoteRequests return updateActiveRemoteRequests
@ -367,7 +368,7 @@ func (r *downloadRequest) commitFileAndMetadata(tmpDir types.Path, absBasePath t
// remove the file if it is not a duplicate. // remove the file if it is not a duplicate.
if duplicate == false { if duplicate == false {
finalDir := path.Dir(finalPath) finalDir := path.Dir(finalPath)
removeDir(types.Path(finalDir), r.Logger) fileutils.RemoveDir(types.Path(finalDir), r.Logger)
} }
completeRemoteRequest(activeRemoteRequests, mxcURL) completeRemoteRequest(activeRemoteRequests, mxcURL)
return updateActiveRemoteRequests return updateActiveRemoteRequests
@ -447,18 +448,18 @@ func (r *downloadRequest) respondFromRemoteFile(w http.ResponseWriter, absBasePa
// integrity checks on the file data in the repository. // integrity checks on the file data in the repository.
// bytesResponded is the total number of bytes written to the response to the client request // bytesResponded is the total number of bytes written to the response to the client request
// bytesWritten is the total number of bytes written to disk // bytesWritten is the total number of bytes written to disk
hash, bytesResponded, bytesWritten, tmpDir, copyError := readAndHashAndWriteWithLimit(resp.Body, maxFileSizeBytes, absBasePath, w) hash, bytesResponded, bytesWritten, tmpDir, copyError := fileutils.ReadAndHashAndWriteWithLimit(resp.Body, maxFileSizeBytes, absBasePath, w)
if copyError != nil { if copyError != nil {
logFields := log.Fields{ logFields := log.Fields{
"MediaID": r.MediaMetadata.MediaID, "MediaID": r.MediaMetadata.MediaID,
"Origin": r.MediaMetadata.Origin, "Origin": r.MediaMetadata.Origin,
} }
if copyError == errFileIsTooLarge { if copyError == fileutils.ErrFileIsTooLarge {
logFields["MaxFileSizeBytes"] = maxFileSizeBytes logFields["MaxFileSizeBytes"] = maxFileSizeBytes
} }
r.Logger.WithError(copyError).WithFields(logFields).Warn("Error while transferring file") r.Logger.WithError(copyError).WithFields(logFields).Warn("Error while transferring file")
removeDir(tmpDir, r.Logger) fileutils.RemoveDir(tmpDir, r.Logger)
// Note: if we have responded with any data in the body at all then we have already sent 200 OK and we can only abort at this point // Note: if we have responded with any data in the body at all then we have already sent 200 OK and we can only abort at this point
if bytesResponded < 1 { if bytesResponded < 1 {
r.jsonErrorResponse(w, util.JSONResponse{ r.jsonErrorResponse(w, util.JSONResponse{

View file

@ -26,6 +26,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth" "github.com/matrix-org/dendrite/clientapi/auth"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/mediaapi/config" "github.com/matrix-org/dendrite/mediaapi/config"
"github.com/matrix-org/dendrite/mediaapi/fileutils"
"github.com/matrix-org/dendrite/mediaapi/storage" "github.com/matrix-org/dendrite/mediaapi/storage"
"github.com/matrix-org/dendrite/mediaapi/types" "github.com/matrix-org/dendrite/mediaapi/types"
"github.com/matrix-org/util" "github.com/matrix-org/util"
@ -142,7 +143,7 @@ func parseAndValidateRequest(req *http.Request, cfg *config.MediaAPI) (*uploadRe
// In case of any error, appropriate files and directories are cleaned up a // In case of any error, appropriate files and directories are cleaned up a
// util.JSONResponse error is returned. // util.JSONResponse error is returned.
func storeFileAndMetadata(tmpDir types.Path, absBasePath types.Path, mediaMetadata *types.MediaMetadata, db *storage.Database, logger *log.Entry) *util.JSONResponse { func storeFileAndMetadata(tmpDir types.Path, absBasePath types.Path, mediaMetadata *types.MediaMetadata, db *storage.Database, logger *log.Entry) *util.JSONResponse {
finalPath, duplicate, err := moveFileWithHashCheck(tmpDir, mediaMetadata, absBasePath, logger) finalPath, duplicate, err := fileutils.MoveFileWithHashCheck(tmpDir, mediaMetadata, absBasePath, logger)
if err != nil { if err != nil {
logger.WithError(err).Error("Failed to move file.") logger.WithError(err).Error("Failed to move file.")
return &util.JSONResponse{ return &util.JSONResponse{
@ -160,7 +161,7 @@ func storeFileAndMetadata(tmpDir types.Path, absBasePath types.Path, mediaMetada
// there is valid metadata in the database for that file. As such we only // there is valid metadata in the database for that file. As such we only
// remove the file if it is not a duplicate. // remove the file if it is not a duplicate.
if duplicate == false { if duplicate == false {
removeDir(types.Path(path.Dir(finalPath)), logger) fileutils.RemoveDir(types.Path(path.Dir(finalPath)), logger)
} }
return &util.JSONResponse{ return &util.JSONResponse{
Code: 400, Code: 400,
@ -197,18 +198,18 @@ func Upload(req *http.Request, cfg *config.MediaAPI, db *storage.Database) util.
// method of deduplicating files to save storage, as well as a way to conduct // method of deduplicating files to save storage, as well as a way to conduct
// integrity checks on the file data in the repository. // integrity checks on the file data in the repository.
// bytesWritten is the total number of bytes written to disk // bytesWritten is the total number of bytes written to disk
hash, _, bytesWritten, tmpDir, copyError := readAndHashAndWriteWithLimit(req.Body, cfg.MaxFileSizeBytes, cfg.AbsBasePath, nil) hash, _, bytesWritten, tmpDir, copyError := fileutils.ReadAndHashAndWriteWithLimit(req.Body, cfg.MaxFileSizeBytes, cfg.AbsBasePath, nil)
if copyError != nil { if copyError != nil {
logFields := log.Fields{ logFields := log.Fields{
"Origin": r.MediaMetadata.Origin, "Origin": r.MediaMetadata.Origin,
"MediaID": r.MediaMetadata.MediaID, "MediaID": r.MediaMetadata.MediaID,
} }
if copyError == errFileIsTooLarge { if copyError == fileutils.ErrFileIsTooLarge {
logFields["MaxFileSizeBytes"] = cfg.MaxFileSizeBytes logFields["MaxFileSizeBytes"] = cfg.MaxFileSizeBytes
} }
logger.WithError(copyError).WithFields(logFields).Warn("Error while transferring file") logger.WithError(copyError).WithFields(logFields).Warn("Error while transferring file")
removeDir(tmpDir, logger) fileutils.RemoveDir(tmpDir, logger)
return util.JSONResponse{ return util.JSONResponse{
Code: 400, Code: 400,
JSON: jsonerror.Unknown(fmt.Sprintf("Failed to upload")), JSON: jsonerror.Unknown(fmt.Sprintf("Failed to upload")),
@ -233,7 +234,7 @@ func Upload(req *http.Request, cfg *config.MediaAPI, db *storage.Database) util.
mediaMetadata, err := db.GetMediaMetadata(r.MediaMetadata.MediaID, r.MediaMetadata.Origin) mediaMetadata, err := db.GetMediaMetadata(r.MediaMetadata.MediaID, r.MediaMetadata.Origin)
if err == nil { if err == nil {
r.MediaMetadata = mediaMetadata r.MediaMetadata = mediaMetadata
removeDir(tmpDir, logger) fileutils.RemoveDir(tmpDir, logger)
return util.JSONResponse{ return util.JSONResponse{
Code: 200, Code: 200,
JSON: uploadResponse{ JSON: uploadResponse{