mediaapi: Hack in SQL db storage and Erik's gotest file upload code

After this, upload in a usual case now works but the code surely needs
cleanup.
This commit is contained in:
Robert Swain 2017-04-26 12:11:22 +02:00
parent 4d1bff2f61
commit d9ee22d043
7 changed files with 485 additions and 29 deletions

View file

@ -21,6 +21,7 @@ import (
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/mediaapi/config"
"github.com/matrix-org/dendrite/mediaapi/routing"
"github.com/matrix-org/dendrite/mediaapi/storage"
log "github.com/Sirupsen/logrus"
)
@ -33,13 +34,10 @@ var (
func main() {
common.SetupLogging(logDir)
if bindAddr == "" {
log.Panic("No BIND_ADDRESS environment variable found.")
}
// db, err := storage.Open(database)
// if err != nil {
// panic(err)
// }
cfg := config.MediaAPI{
ServerName: "localhost",
@ -47,8 +45,18 @@ func main() {
DataSource: database,
}
db, err := storage.Open(cfg.DataSource)
if err != nil {
log.Panicln("Failed to open database:", err)
}
repo := &storage.Repository{
StorePrefix: cfg.BasePath,
MaxBytes: 61440,
}
log.Info("Starting mediaapi")
routing.Setup(http.DefaultServeMux, http.DefaultClient, cfg)
routing.Setup(http.DefaultServeMux, http.DefaultClient, cfg, db, repo)
log.Fatal(http.ListenAndServe(bindAddr, nil))
}

View file

@ -19,6 +19,7 @@ import (
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/mediaapi/config"
"github.com/matrix-org/dendrite/mediaapi/storage"
"github.com/matrix-org/dendrite/mediaapi/writers"
"github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus"
@ -28,11 +29,11 @@ const pathPrefixR0 = "/_matrix/media/v1"
// Setup registers HTTP handlers with the given ServeMux. It also supplies the given http.Client
// to clients which need to make outbound HTTP requests.
func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg config.MediaAPI) {
func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg config.MediaAPI, db *storage.Database, repo *storage.Repository) {
apiMux := mux.NewRouter()
r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter()
r0mux.Handle("/upload", make("upload", util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse {
return writers.Upload(req, cfg)
return writers.Upload(req, cfg, db, repo)
})))
servMux.Handle("/metrics", prometheus.Handler())

View file

@ -0,0 +1,92 @@
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"fmt"
"os"
log "github.com/Sirupsen/logrus"
)
// LimitedFileWriter writes only a limited number of bytes to a file.
//
// If the callee attempts to write more bytes the file is deleted and further
// writes are silently discarded.
//
// This isn't thread safe.
type LimitedFileWriter struct {
filePath string
file *os.File
writtenBytes uint64
maxBytes uint64
}
// NewLimitedFileWriter creates a new LimitedFileWriter at the given location.
//
// If a file already exists at the location it is immediately truncated.
//
// A maxBytes of 0 or negative is treated as no limit.
func NewLimitedFileWriter(filePath string, maxBytes uint64) (*LimitedFileWriter, error) {
file, err := os.Create(filePath)
if err != nil {
return nil, err
}
writer := LimitedFileWriter{
filePath: filePath,
file: file,
maxBytes: maxBytes,
}
return &writer, nil
}
// Close closes the underlying file descriptor, if its open.
//
// Any error comes from File.Close
func (writer *LimitedFileWriter) Close() error {
if writer.file != nil {
file := writer.file
writer.file = nil
return file.Close()
}
return nil
}
func (writer *LimitedFileWriter) Write(p []byte) (n int, err error) {
if writer.maxBytes > 0 && uint64(len(p))+writer.writtenBytes > writer.maxBytes {
if writer.file != nil {
writer.Close()
err = os.Remove(writer.filePath)
if err != nil {
log.Printf("Failed to delete file %v\n", err)
}
}
return 0, fmt.Errorf("Reached limit")
}
if writer.file != nil {
n, err = writer.file.Write(p)
writer.writtenBytes += uint64(n)
if err != nil {
log.Printf("Failed to write to file %v\n", err)
}
}
return
}

View file

@ -16,17 +16,38 @@ package storage
import (
"database/sql"
"time"
)
const mediaSchema = `
-- The events table holds metadata for each media upload to the local server,
-- the actual file is stored separately.
CREATE TABLE IF NOT EXISTS media_repository (
-- The id used to refer to the media.
-- This is a base64-encoded sha256 hash of the file data
media_id TEXT PRIMARY KEY,
-- The origin of the media as requested by the client.
media_origin TEXT NOT NULL,
-- The MIME-type of the media file.
content_type TEXT NOT NULL,
-- The HTTP Content-Disposition header for the media file.
content_disposition TEXT NOT NULL DEFAULT 'inline',
-- Size of the media file in bytes.
file_size BIGINT NOT NULL,
-- When the content was uploaded in ms.
created_ts BIGINT NOT NULL,
-- The name with which the media was uploaded.
upload_name TEXT NOT NULL,
-- The user who uploaded the file.
user_id TEXT NOT NULL,
UNIQUE(media_id, media_origin)
);
`
const insertMediaSQL = "" +
"INSERT INTO events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids)" +
" VALUES ($1, $2, $3, $4, $5, $6)" +
" ON CONFLICT ON CONSTRAINT event_id_unique" +
" DO NOTHING" +
" RETURNING event_nid, state_snapshot_nid"
const insertMediaSQL = `
INSERT INTO media_repository (media_id, media_origin, content_type, content_disposition, file_size, created_ts, upload_name, user_id)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`
type mediaStatements struct {
insertMediaStmt *sql.Stmt
@ -43,5 +64,11 @@ func (s *mediaStatements) prepare(db *sql.DB) (err error) {
}.prepare(db)
}
func (s *mediaStatements) insertMedia() {
func (s *mediaStatements) insertMedia(mediaID string, mediaOrigin string, contentType string,
contentDisposition string, fileSize int64, uploadName string, userID string) error {
_, err := s.insertMediaStmt.Exec(
mediaID, mediaOrigin, contentType, contentDisposition, fileSize,
int64(time.Now().UnixNano()/1000000), uploadName, userID,
)
return err
}

View file

@ -0,0 +1,283 @@
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"crypto/sha256"
"encoding/base64"
"hash"
"io"
"io/ioutil"
"os"
"path"
log "github.com/Sirupsen/logrus"
)
// Description contains various attributes for an image.
type Description struct {
Type string
Length int64
}
type repositoryPaths struct {
contentPath string
typePath string
}
// Repository stores locally uploaded media, and caches remote media that has
// been requested.
type Repository struct {
StorePrefix string
MaxBytes uint64
}
// ReaderFromRemoteCache returns a io.ReadCloser with the cached remote content,
// if it exists. Use IsNotExist to check if the error was due to it not existing
// in the cache
func (repo Repository) ReaderFromRemoteCache(host, name string) (io.ReadCloser, *Description, error) {
mediaDir := repo.getDirForRemoteMedia(host, name)
repoPaths := getPathsForMedia(mediaDir)
return repo.readerFromRepository(repoPaths)
}
// ReaderFromLocalRepo returns a io.ReadCloser with the locally uploaded content,
// if it exists. Use IsNotExist to check if the error was due to it not existing
// in the cache
func (repo Repository) ReaderFromLocalRepo(name string) (io.ReadCloser, *Description, error) {
mediaDir := repo.getDirForLocalMedia(name)
repoPaths := getPathsForMedia(mediaDir)
return repo.readerFromRepository(repoPaths)
}
func (repo Repository) readerFromRepository(repoPaths repositoryPaths) (io.ReadCloser, *Description, error) {
contentTypeBytes, err := ioutil.ReadFile(repoPaths.typePath)
if err != nil {
return nil, nil, err
}
contentType := string(contentTypeBytes)
file, err := os.Open(repoPaths.contentPath)
if err != nil {
return nil, nil, err
}
stat, err := file.Stat()
if err != nil {
return nil, nil, err
}
descr := Description{
Type: contentType,
Length: stat.Size(),
}
return file, &descr, nil
}
// WriterToLocalRepository returns a RepositoryWriter for writing newly uploaded
// content into the repository.
//
// The returned RepositoryWriter will fail if more than MaxBytes tries to be
// written.
func (repo Repository) WriterToLocalRepository(descr Description) (RepositoryWriter, error) {
return newLocalRepositoryWriter(repo, descr)
}
// WriterToRemoteCache returns a RepositoryWriter for caching newly downloaded
// remote content.
//
// The returned RepositoryWriter will silently stop writing if more than MaxBytes
// tries to be written and does *not* return an error.
func (repo Repository) WriterToRemoteCache(host, name string, descr Description) (RepositoryWriter, error) {
return newRemoteRepositoryWriter(repo, host, name, descr)
}
func (repo *Repository) makeTempDir() (string, error) {
tmpDir := path.Join(repo.StorePrefix, "tmp")
os.MkdirAll(tmpDir, 0770)
return ioutil.TempDir(tmpDir, "")
}
func (repo *Repository) getDirForLocalMedia(name string) string {
return path.Join(repo.StorePrefix, "local", name[:3], name[3:])
}
func (repo *Repository) getDirForRemoteMedia(host, sanitizedName string) string {
return path.Join(repo.StorePrefix, "remote", host, sanitizedName[:3], sanitizedName[3:])
}
// Get the actual paths for the data and metadata associated with remote media.
func getPathsForMedia(dir string) repositoryPaths {
contentPath := path.Join(dir, "content")
typePath := path.Join(dir, "type")
return repositoryPaths{
contentPath: contentPath,
typePath: typePath,
}
}
// IsNotExists check if error was due to content not existing in cache.
func IsNotExists(err error) bool { return os.IsNotExist(err) }
// RepositoryWriter is used to either store into the repository newly uploaded
// media or to cache recently fetched remote media.
type RepositoryWriter interface {
io.WriteCloser
// Finished should be called when successfully finished writing; otherwise
// the written content will not be committed to the repository.
Finished() (string, error)
}
type remoteRepositoryWriter struct {
tmpDir string
finalDir string
name string
file io.WriteCloser
erred bool
}
func newRemoteRepositoryWriter(repo Repository, host, name string, descr Description) (*remoteRepositoryWriter, error) {
tmpFile, tmpDir, err := getTempWriter(repo, descr)
if err != nil {
log.Printf("Failed to create writer: %v\n", err)
return nil, err
}
return &remoteRepositoryWriter{
tmpDir: tmpDir,
finalDir: repo.getDirForRemoteMedia(host, name),
name: name,
file: tmpFile,
erred: false,
}, nil
}
func (writer remoteRepositoryWriter) Write(p []byte) (int, error) {
// Its OK to fail when writing to the remote repo. We just hide the error
// from the layers above
if !writer.erred {
if _, err := writer.file.Write(p); err != nil {
writer.erred = true
}
}
return len(p), nil
}
func (writer remoteRepositoryWriter) Close() error {
os.RemoveAll(writer.tmpDir)
writer.file.Close()
return nil
}
func (writer remoteRepositoryWriter) Finished() (string, error) {
var err error
if !writer.erred {
os.MkdirAll(path.Dir(writer.finalDir), 0770)
err = os.Rename(writer.tmpDir, writer.finalDir)
if err != nil {
return "", err
}
}
err = writer.Close()
return writer.name, err
}
type localRepositoryWriter struct {
repo Repository
tmpDir string
hasher hash.Hash
file io.WriteCloser
finished bool
}
func newLocalRepositoryWriter(repo Repository, descr Description) (*localRepositoryWriter, error) {
tmpFile, tmpDir, err := getTempWriter(repo, descr)
if err != nil {
return nil, err
}
return &localRepositoryWriter{
repo: repo,
tmpDir: tmpDir,
hasher: sha256.New(),
file: tmpFile,
finished: false,
}, nil
}
func (writer localRepositoryWriter) Write(p []byte) (int, error) {
writer.hasher.Write(p) // Never errors.
n, err := writer.file.Write(p)
if err != nil {
writer.Close()
}
return n, err
}
func (writer localRepositoryWriter) Close() error {
var err error
if !writer.finished {
err = os.RemoveAll(writer.tmpDir)
if err != nil {
return err
}
}
err = writer.file.Close()
return err
}
func (writer localRepositoryWriter) Finished() (string, error) {
hash := writer.hasher.Sum(nil)
name := base64.URLEncoding.EncodeToString(hash[:])
finalDir := writer.repo.getDirForLocalMedia(name)
os.MkdirAll(path.Dir(finalDir), 0770)
err := os.Rename(writer.tmpDir, finalDir)
if err != nil {
log.Println("Failed to move temp directory:", writer.tmpDir, finalDir, err)
return "", err
}
writer.finished = true
writer.Close()
return name, nil
}
func getTempWriter(repo Repository, descr Description) (io.WriteCloser, string, error) {
tmpDir, err := repo.makeTempDir()
if err != nil {
log.Printf("Failed to create temp dir: %v\n", err)
return nil, "", err
}
repoPaths := getPathsForMedia(tmpDir)
if err = ioutil.WriteFile(repoPaths.typePath, []byte(descr.Type), 0660); err != nil {
log.Printf("Failed to create typeFile: %q\n", err)
return nil, "", err
}
tmpFile, err := NewLimitedFileWriter(repoPaths.contentPath, repo.MaxBytes)
if err != nil {
log.Printf("Failed to create limited file: %v\n", err)
return nil, "", err
}
return tmpFile, tmpDir, nil
}

View file

@ -16,6 +16,9 @@ package storage
import (
"database/sql"
// Import the postgres database driver.
_ "github.com/lib/pq"
)
// A Database is used to store room events and stream offsets.
@ -36,3 +39,8 @@ func Open(dataSourceName string) (*Database, error) {
}
return &d, nil
}
// CreateMedia inserts the metadata about the uploaded media into the database.
func (d *Database) CreateMedia(mediaID string, mediaOrigin string, contentType string, contentDisposition string, fileSize int64, uploadName string, userID string) error {
return d.statements.insertMedia(mediaID, mediaOrigin, contentType, contentDisposition, fileSize, uploadName, userID)
}

View file

@ -15,29 +15,34 @@
package writers
import (
"fmt"
"io"
"net/http"
"strconv"
"strings"
log "github.com/Sirupsen/logrus"
"github.com/matrix-org/dendrite/clientapi/auth"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/mediaapi/config"
"github.com/matrix-org/dendrite/mediaapi/storage"
"github.com/matrix-org/util"
)
// UploadRequest metadata included in or derivable from an upload request
// https://matrix.org/docs/spec/client_server/r0.2.0.html#post-matrix-media-r0-upload
// NOTE: ContentType is an HTTP request header and Filename is passed as a query parameter
type uploadRequest struct {
type UploadRequest struct {
ContentDisposition string
ContentLength int
ContentLength int64
ContentType string
Filename string
Base64FileHash string
Method string
UserID string
}
func (r uploadRequest) Validate() *util.JSONResponse {
// Validate validates the UploadRequest fields
func (r UploadRequest) Validate() *util.JSONResponse {
// TODO: Any validation to be done on ContentDisposition?
if r.ContentLength < 1 {
return &util.JSONResponse{
@ -88,7 +93,7 @@ type uploadResponse struct {
}
// Upload implements /upload
func Upload(req *http.Request, cfg config.MediaAPI) util.JSONResponse {
func Upload(req *http.Request, cfg config.MediaAPI, db *storage.Database, repo *storage.Repository) util.JSONResponse {
logger := util.GetLogger(req.Context())
// FIXME: This will require querying some other component/db but currently
@ -98,13 +103,9 @@ func Upload(req *http.Request, cfg config.MediaAPI) util.JSONResponse {
return *resErr
}
// req.Header.Get() returns "" if no header
// strconv.Atoi() returns 0 when parsing ""
contentLength, _ := strconv.Atoi(req.Header.Get("Content-Length"))
r := uploadRequest{
r := &UploadRequest{
ContentDisposition: req.Header.Get("Content-Disposition"),
ContentLength: contentLength,
ContentLength: req.ContentLength,
ContentType: req.Header.Get("Content-Type"),
Filename: req.FormValue("filename"),
Method: req.Method,
@ -126,14 +127,50 @@ func Upload(req *http.Request, cfg config.MediaAPI) util.JSONResponse {
// - progressive writing (could support Content-Length 0 and cut off
// after some max upload size is exceeded)
// - generate id (ideally a hash but a random string to start with)
// - generate thumbnails
// TODO: Write metadata to database
// TODO: Respond to request
writer, err := repo.WriterToLocalRepository(storage.Description{
Type: r.ContentType,
})
if err != nil {
logger.Infof("Failed to get cache writer %q\n", err)
return util.JSONResponse{
Code: 400,
JSON: jsonerror.BadJSON(fmt.Sprintf("Failed to upload: %q", err)),
}
}
defer writer.Close()
if _, err = io.Copy(writer, req.Body); err != nil {
logger.Infof("Failed to copy %q\n", err)
return util.JSONResponse{
Code: 400,
JSON: jsonerror.BadJSON(fmt.Sprintf("Failed to upload: %q", err)),
}
}
r.Base64FileHash, err = writer.Finished()
if err != nil {
return util.JSONResponse{
Code: 400,
JSON: jsonerror.BadJSON(fmt.Sprintf("Failed to upload: %q", err)),
}
}
// TODO: check if file with hash already exists
// TODO: generate thumbnails
err = db.CreateMedia(r.Base64FileHash, cfg.ServerName, r.ContentType, r.ContentDisposition, r.ContentLength, r.Filename, r.UserID)
if err != nil {
return util.JSONResponse{
Code: 400,
JSON: jsonerror.BadJSON(fmt.Sprintf("Failed to upload: %q", err)),
}
}
return util.JSONResponse{
Code: 200,
JSON: uploadResponse{
ContentURI: "mxc://example.com/AQwafuaFswefuhsfAFAgsw",
ContentURI: fmt.Sprintf("mxc://%s/%s", cfg.ServerName, r.Base64FileHash),
},
}
}