diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-media-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-media-api-server/main.go index 884eac80f..31d8bfb62 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-media-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-media-api-server/main.go @@ -21,6 +21,7 @@ import ( "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/mediaapi/config" "github.com/matrix-org/dendrite/mediaapi/routing" + "github.com/matrix-org/dendrite/mediaapi/storage" log "github.com/Sirupsen/logrus" ) @@ -33,13 +34,10 @@ var ( func main() { common.SetupLogging(logDir) + if bindAddr == "" { log.Panic("No BIND_ADDRESS environment variable found.") } - // db, err := storage.Open(database) - // if err != nil { - // panic(err) - // } cfg := config.MediaAPI{ ServerName: "localhost", @@ -47,8 +45,18 @@ func main() { DataSource: database, } + db, err := storage.Open(cfg.DataSource) + if err != nil { + log.Panicln("Failed to open database:", err) + } + + repo := &storage.Repository{ + StorePrefix: cfg.BasePath, + MaxBytes: 61440, + } + log.Info("Starting mediaapi") - routing.Setup(http.DefaultServeMux, http.DefaultClient, cfg) + routing.Setup(http.DefaultServeMux, http.DefaultClient, cfg, db, repo) log.Fatal(http.ListenAndServe(bindAddr, nil)) } diff --git a/src/github.com/matrix-org/dendrite/mediaapi/routing/routing.go b/src/github.com/matrix-org/dendrite/mediaapi/routing/routing.go index fd5ff7384..562dc8e84 100644 --- a/src/github.com/matrix-org/dendrite/mediaapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/mediaapi/routing/routing.go @@ -19,6 +19,7 @@ import ( "github.com/gorilla/mux" "github.com/matrix-org/dendrite/mediaapi/config" + "github.com/matrix-org/dendrite/mediaapi/storage" "github.com/matrix-org/dendrite/mediaapi/writers" "github.com/matrix-org/util" "github.com/prometheus/client_golang/prometheus" @@ -28,11 +29,11 @@ const pathPrefixR0 = "/_matrix/media/v1" // Setup registers HTTP handlers with the given ServeMux. It also supplies the given http.Client // to clients which need to make outbound HTTP requests. -func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg config.MediaAPI) { +func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg config.MediaAPI, db *storage.Database, repo *storage.Repository) { apiMux := mux.NewRouter() r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter() r0mux.Handle("/upload", make("upload", util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse { - return writers.Upload(req, cfg) + return writers.Upload(req, cfg, db, repo) }))) servMux.Handle("/metrics", prometheus.Handler()) diff --git a/src/github.com/matrix-org/dendrite/mediaapi/storage/fileio.go b/src/github.com/matrix-org/dendrite/mediaapi/storage/fileio.go new file mode 100644 index 000000000..5bd87ff8c --- /dev/null +++ b/src/github.com/matrix-org/dendrite/mediaapi/storage/fileio.go @@ -0,0 +1,92 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "fmt" + "os" + + log "github.com/Sirupsen/logrus" +) + +// LimitedFileWriter writes only a limited number of bytes to a file. +// +// If the callee attempts to write more bytes the file is deleted and further +// writes are silently discarded. +// +// This isn't thread safe. +type LimitedFileWriter struct { + filePath string + file *os.File + writtenBytes uint64 + maxBytes uint64 +} + +// NewLimitedFileWriter creates a new LimitedFileWriter at the given location. +// +// If a file already exists at the location it is immediately truncated. +// +// A maxBytes of 0 or negative is treated as no limit. +func NewLimitedFileWriter(filePath string, maxBytes uint64) (*LimitedFileWriter, error) { + file, err := os.Create(filePath) + if err != nil { + return nil, err + } + + writer := LimitedFileWriter{ + filePath: filePath, + file: file, + maxBytes: maxBytes, + } + + return &writer, nil +} + +// Close closes the underlying file descriptor, if its open. +// +// Any error comes from File.Close +func (writer *LimitedFileWriter) Close() error { + if writer.file != nil { + file := writer.file + writer.file = nil + return file.Close() + } + return nil +} + +func (writer *LimitedFileWriter) Write(p []byte) (n int, err error) { + if writer.maxBytes > 0 && uint64(len(p))+writer.writtenBytes > writer.maxBytes { + if writer.file != nil { + writer.Close() + err = os.Remove(writer.filePath) + if err != nil { + log.Printf("Failed to delete file %v\n", err) + } + } + + return 0, fmt.Errorf("Reached limit") + } + + if writer.file != nil { + n, err = writer.file.Write(p) + writer.writtenBytes += uint64(n) + + if err != nil { + log.Printf("Failed to write to file %v\n", err) + } + } + + return +} diff --git a/src/github.com/matrix-org/dendrite/mediaapi/storage/media.go b/src/github.com/matrix-org/dendrite/mediaapi/storage/media.go index f87401d73..b46ab8181 100644 --- a/src/github.com/matrix-org/dendrite/mediaapi/storage/media.go +++ b/src/github.com/matrix-org/dendrite/mediaapi/storage/media.go @@ -16,17 +16,38 @@ package storage import ( "database/sql" + "time" ) const mediaSchema = ` +-- The events table holds metadata for each media upload to the local server, +-- the actual file is stored separately. +CREATE TABLE IF NOT EXISTS media_repository ( + -- The id used to refer to the media. + -- This is a base64-encoded sha256 hash of the file data + media_id TEXT PRIMARY KEY, + -- The origin of the media as requested by the client. + media_origin TEXT NOT NULL, + -- The MIME-type of the media file. + content_type TEXT NOT NULL, + -- The HTTP Content-Disposition header for the media file. + content_disposition TEXT NOT NULL DEFAULT 'inline', + -- Size of the media file in bytes. + file_size BIGINT NOT NULL, + -- When the content was uploaded in ms. + created_ts BIGINT NOT NULL, + -- The name with which the media was uploaded. + upload_name TEXT NOT NULL, + -- The user who uploaded the file. + user_id TEXT NOT NULL, + UNIQUE(media_id, media_origin) +); ` -const insertMediaSQL = "" + - "INSERT INTO events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids)" + - " VALUES ($1, $2, $3, $4, $5, $6)" + - " ON CONFLICT ON CONSTRAINT event_id_unique" + - " DO NOTHING" + - " RETURNING event_nid, state_snapshot_nid" +const insertMediaSQL = ` +INSERT INTO media_repository (media_id, media_origin, content_type, content_disposition, file_size, created_ts, upload_name, user_id) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) +` type mediaStatements struct { insertMediaStmt *sql.Stmt @@ -43,5 +64,11 @@ func (s *mediaStatements) prepare(db *sql.DB) (err error) { }.prepare(db) } -func (s *mediaStatements) insertMedia() { +func (s *mediaStatements) insertMedia(mediaID string, mediaOrigin string, contentType string, + contentDisposition string, fileSize int64, uploadName string, userID string) error { + _, err := s.insertMediaStmt.Exec( + mediaID, mediaOrigin, contentType, contentDisposition, fileSize, + int64(time.Now().UnixNano()/1000000), uploadName, userID, + ) + return err } diff --git a/src/github.com/matrix-org/dendrite/mediaapi/storage/repository.go b/src/github.com/matrix-org/dendrite/mediaapi/storage/repository.go new file mode 100644 index 000000000..2378646ae --- /dev/null +++ b/src/github.com/matrix-org/dendrite/mediaapi/storage/repository.go @@ -0,0 +1,283 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "crypto/sha256" + "encoding/base64" + "hash" + "io" + "io/ioutil" + "os" + "path" + + log "github.com/Sirupsen/logrus" +) + +// Description contains various attributes for an image. +type Description struct { + Type string + Length int64 +} + +type repositoryPaths struct { + contentPath string + typePath string +} + +// Repository stores locally uploaded media, and caches remote media that has +// been requested. +type Repository struct { + StorePrefix string + MaxBytes uint64 +} + +// ReaderFromRemoteCache returns a io.ReadCloser with the cached remote content, +// if it exists. Use IsNotExist to check if the error was due to it not existing +// in the cache +func (repo Repository) ReaderFromRemoteCache(host, name string) (io.ReadCloser, *Description, error) { + mediaDir := repo.getDirForRemoteMedia(host, name) + repoPaths := getPathsForMedia(mediaDir) + + return repo.readerFromRepository(repoPaths) +} + +// ReaderFromLocalRepo returns a io.ReadCloser with the locally uploaded content, +// if it exists. Use IsNotExist to check if the error was due to it not existing +// in the cache +func (repo Repository) ReaderFromLocalRepo(name string) (io.ReadCloser, *Description, error) { + mediaDir := repo.getDirForLocalMedia(name) + repoPaths := getPathsForMedia(mediaDir) + + return repo.readerFromRepository(repoPaths) +} + +func (repo Repository) readerFromRepository(repoPaths repositoryPaths) (io.ReadCloser, *Description, error) { + contentTypeBytes, err := ioutil.ReadFile(repoPaths.typePath) + if err != nil { + return nil, nil, err + } + + contentType := string(contentTypeBytes) + + file, err := os.Open(repoPaths.contentPath) + if err != nil { + return nil, nil, err + } + + stat, err := file.Stat() + if err != nil { + return nil, nil, err + } + + descr := Description{ + Type: contentType, + Length: stat.Size(), + } + + return file, &descr, nil +} + +// WriterToLocalRepository returns a RepositoryWriter for writing newly uploaded +// content into the repository. +// +// The returned RepositoryWriter will fail if more than MaxBytes tries to be +// written. +func (repo Repository) WriterToLocalRepository(descr Description) (RepositoryWriter, error) { + return newLocalRepositoryWriter(repo, descr) +} + +// WriterToRemoteCache returns a RepositoryWriter for caching newly downloaded +// remote content. +// +// The returned RepositoryWriter will silently stop writing if more than MaxBytes +// tries to be written and does *not* return an error. +func (repo Repository) WriterToRemoteCache(host, name string, descr Description) (RepositoryWriter, error) { + return newRemoteRepositoryWriter(repo, host, name, descr) +} + +func (repo *Repository) makeTempDir() (string, error) { + tmpDir := path.Join(repo.StorePrefix, "tmp") + os.MkdirAll(tmpDir, 0770) + return ioutil.TempDir(tmpDir, "") +} + +func (repo *Repository) getDirForLocalMedia(name string) string { + return path.Join(repo.StorePrefix, "local", name[:3], name[3:]) +} + +func (repo *Repository) getDirForRemoteMedia(host, sanitizedName string) string { + return path.Join(repo.StorePrefix, "remote", host, sanitizedName[:3], sanitizedName[3:]) +} + +// Get the actual paths for the data and metadata associated with remote media. +func getPathsForMedia(dir string) repositoryPaths { + contentPath := path.Join(dir, "content") + typePath := path.Join(dir, "type") + return repositoryPaths{ + contentPath: contentPath, + typePath: typePath, + } +} + +// IsNotExists check if error was due to content not existing in cache. +func IsNotExists(err error) bool { return os.IsNotExist(err) } + +// RepositoryWriter is used to either store into the repository newly uploaded +// media or to cache recently fetched remote media. +type RepositoryWriter interface { + io.WriteCloser + + // Finished should be called when successfully finished writing; otherwise + // the written content will not be committed to the repository. + Finished() (string, error) +} + +type remoteRepositoryWriter struct { + tmpDir string + finalDir string + name string + file io.WriteCloser + erred bool +} + +func newRemoteRepositoryWriter(repo Repository, host, name string, descr Description) (*remoteRepositoryWriter, error) { + tmpFile, tmpDir, err := getTempWriter(repo, descr) + if err != nil { + log.Printf("Failed to create writer: %v\n", err) + return nil, err + } + + return &remoteRepositoryWriter{ + tmpDir: tmpDir, + finalDir: repo.getDirForRemoteMedia(host, name), + name: name, + file: tmpFile, + erred: false, + }, nil +} + +func (writer remoteRepositoryWriter) Write(p []byte) (int, error) { + // Its OK to fail when writing to the remote repo. We just hide the error + // from the layers above + if !writer.erred { + if _, err := writer.file.Write(p); err != nil { + writer.erred = true + } + } + return len(p), nil +} + +func (writer remoteRepositoryWriter) Close() error { + os.RemoveAll(writer.tmpDir) + writer.file.Close() + return nil +} + +func (writer remoteRepositoryWriter) Finished() (string, error) { + var err error + if !writer.erred { + os.MkdirAll(path.Dir(writer.finalDir), 0770) + err = os.Rename(writer.tmpDir, writer.finalDir) + if err != nil { + return "", err + } + } + err = writer.Close() + return writer.name, err +} + +type localRepositoryWriter struct { + repo Repository + tmpDir string + hasher hash.Hash + file io.WriteCloser + finished bool +} + +func newLocalRepositoryWriter(repo Repository, descr Description) (*localRepositoryWriter, error) { + tmpFile, tmpDir, err := getTempWriter(repo, descr) + if err != nil { + return nil, err + } + + return &localRepositoryWriter{ + repo: repo, + tmpDir: tmpDir, + hasher: sha256.New(), + file: tmpFile, + finished: false, + }, nil +} + +func (writer localRepositoryWriter) Write(p []byte) (int, error) { + writer.hasher.Write(p) // Never errors. + n, err := writer.file.Write(p) + if err != nil { + writer.Close() + } + return n, err +} + +func (writer localRepositoryWriter) Close() error { + var err error + if !writer.finished { + err = os.RemoveAll(writer.tmpDir) + if err != nil { + return err + } + } + + err = writer.file.Close() + return err +} + +func (writer localRepositoryWriter) Finished() (string, error) { + hash := writer.hasher.Sum(nil) + name := base64.URLEncoding.EncodeToString(hash[:]) + finalDir := writer.repo.getDirForLocalMedia(name) + os.MkdirAll(path.Dir(finalDir), 0770) + err := os.Rename(writer.tmpDir, finalDir) + if err != nil { + log.Println("Failed to move temp directory:", writer.tmpDir, finalDir, err) + return "", err + } + writer.finished = true + writer.Close() + return name, nil +} + +func getTempWriter(repo Repository, descr Description) (io.WriteCloser, string, error) { + tmpDir, err := repo.makeTempDir() + if err != nil { + log.Printf("Failed to create temp dir: %v\n", err) + return nil, "", err + } + + repoPaths := getPathsForMedia(tmpDir) + + if err = ioutil.WriteFile(repoPaths.typePath, []byte(descr.Type), 0660); err != nil { + log.Printf("Failed to create typeFile: %q\n", err) + return nil, "", err + } + + tmpFile, err := NewLimitedFileWriter(repoPaths.contentPath, repo.MaxBytes) + if err != nil { + log.Printf("Failed to create limited file: %v\n", err) + return nil, "", err + } + + return tmpFile, tmpDir, nil +} diff --git a/src/github.com/matrix-org/dendrite/mediaapi/storage/storage.go b/src/github.com/matrix-org/dendrite/mediaapi/storage/storage.go index 0c881a480..dc987e44d 100644 --- a/src/github.com/matrix-org/dendrite/mediaapi/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/mediaapi/storage/storage.go @@ -16,6 +16,9 @@ package storage import ( "database/sql" + + // Import the postgres database driver. + _ "github.com/lib/pq" ) // A Database is used to store room events and stream offsets. @@ -36,3 +39,8 @@ func Open(dataSourceName string) (*Database, error) { } return &d, nil } + +// CreateMedia inserts the metadata about the uploaded media into the database. +func (d *Database) CreateMedia(mediaID string, mediaOrigin string, contentType string, contentDisposition string, fileSize int64, uploadName string, userID string) error { + return d.statements.insertMedia(mediaID, mediaOrigin, contentType, contentDisposition, fileSize, uploadName, userID) +} diff --git a/src/github.com/matrix-org/dendrite/mediaapi/writers/upload.go b/src/github.com/matrix-org/dendrite/mediaapi/writers/upload.go index 0e0d16c8b..f68168840 100644 --- a/src/github.com/matrix-org/dendrite/mediaapi/writers/upload.go +++ b/src/github.com/matrix-org/dendrite/mediaapi/writers/upload.go @@ -15,29 +15,34 @@ package writers import ( + "fmt" + "io" "net/http" - "strconv" "strings" log "github.com/Sirupsen/logrus" "github.com/matrix-org/dendrite/clientapi/auth" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/mediaapi/config" + "github.com/matrix-org/dendrite/mediaapi/storage" "github.com/matrix-org/util" ) +// UploadRequest metadata included in or derivable from an upload request // https://matrix.org/docs/spec/client_server/r0.2.0.html#post-matrix-media-r0-upload // NOTE: ContentType is an HTTP request header and Filename is passed as a query parameter -type uploadRequest struct { +type UploadRequest struct { ContentDisposition string - ContentLength int + ContentLength int64 ContentType string Filename string + Base64FileHash string Method string UserID string } -func (r uploadRequest) Validate() *util.JSONResponse { +// Validate validates the UploadRequest fields +func (r UploadRequest) Validate() *util.JSONResponse { // TODO: Any validation to be done on ContentDisposition? if r.ContentLength < 1 { return &util.JSONResponse{ @@ -88,7 +93,7 @@ type uploadResponse struct { } // Upload implements /upload -func Upload(req *http.Request, cfg config.MediaAPI) util.JSONResponse { +func Upload(req *http.Request, cfg config.MediaAPI, db *storage.Database, repo *storage.Repository) util.JSONResponse { logger := util.GetLogger(req.Context()) // FIXME: This will require querying some other component/db but currently @@ -98,13 +103,9 @@ func Upload(req *http.Request, cfg config.MediaAPI) util.JSONResponse { return *resErr } - // req.Header.Get() returns "" if no header - // strconv.Atoi() returns 0 when parsing "" - contentLength, _ := strconv.Atoi(req.Header.Get("Content-Length")) - - r := uploadRequest{ + r := &UploadRequest{ ContentDisposition: req.Header.Get("Content-Disposition"), - ContentLength: contentLength, + ContentLength: req.ContentLength, ContentType: req.Header.Get("Content-Type"), Filename: req.FormValue("filename"), Method: req.Method, @@ -126,14 +127,50 @@ func Upload(req *http.Request, cfg config.MediaAPI) util.JSONResponse { // - progressive writing (could support Content-Length 0 and cut off // after some max upload size is exceeded) // - generate id (ideally a hash but a random string to start with) - // - generate thumbnails - // TODO: Write metadata to database - // TODO: Respond to request + writer, err := repo.WriterToLocalRepository(storage.Description{ + Type: r.ContentType, + }) + if err != nil { + logger.Infof("Failed to get cache writer %q\n", err) + return util.JSONResponse{ + Code: 400, + JSON: jsonerror.BadJSON(fmt.Sprintf("Failed to upload: %q", err)), + } + } + + defer writer.Close() + + if _, err = io.Copy(writer, req.Body); err != nil { + logger.Infof("Failed to copy %q\n", err) + return util.JSONResponse{ + Code: 400, + JSON: jsonerror.BadJSON(fmt.Sprintf("Failed to upload: %q", err)), + } + } + + r.Base64FileHash, err = writer.Finished() + if err != nil { + return util.JSONResponse{ + Code: 400, + JSON: jsonerror.BadJSON(fmt.Sprintf("Failed to upload: %q", err)), + } + } + // TODO: check if file with hash already exists + + // TODO: generate thumbnails + + err = db.CreateMedia(r.Base64FileHash, cfg.ServerName, r.ContentType, r.ContentDisposition, r.ContentLength, r.Filename, r.UserID) + if err != nil { + return util.JSONResponse{ + Code: 400, + JSON: jsonerror.BadJSON(fmt.Sprintf("Failed to upload: %q", err)), + } + } return util.JSONResponse{ Code: 200, JSON: uploadResponse{ - ContentURI: "mxc://example.com/AQwafuaFswefuhsfAFAgsw", + ContentURI: fmt.Sprintf("mxc://%s/%s", cfg.ServerName, r.Base64FileHash), }, } }