From 330359e291d69512fed394764d4f94721980960f Mon Sep 17 00:00:00 2001 From: Kieran Date: Tue, 26 Jul 2022 13:32:36 +0100 Subject: [PATCH] Better multi-part upload support --- VoidCat/Controllers/UploadController.cs | 45 +++++-- VoidCat/Model/IngressPayload.cs | 9 +- VoidCat/Services/Files/FileStorageStartup.cs | 6 +- VoidCat/Services/Files/S3FileStore.cs | 116 ++++++++++++++++--- VoidCat/spa/src/FileUpload.js | 16 +-- VoidCat/spa/src/RateCalculator.js | 4 + 6 files changed, 157 insertions(+), 39 deletions(-) diff --git a/VoidCat/Controllers/UploadController.cs b/VoidCat/Controllers/UploadController.cs index feb590d..7bd61db 100644 --- a/VoidCat/Controllers/UploadController.cs +++ b/VoidCat/Controllers/UploadController.cs @@ -80,7 +80,7 @@ namespace VoidCat.Controllers store = user.Storage!; } } - + var meta = new SecretVoidFileMeta { MimeType = mime, @@ -91,11 +91,8 @@ namespace VoidCat.Controllers Storage = store }; - var digest = Request.Headers.GetHeader("V-Digest"); - var vf = await _storage.Ingress(new(Request.Body, meta) - { - Hash = digest - }, HttpContext.RequestAborted); + var (segment, totalSegments) = ParseSegmentsHeader(); + var vf = await _storage.Ingress(new(Request.Body, meta, segment, totalSegments), HttpContext.RequestAborted); // save metadata await _metadata.Set(vf.Id, vf.Metadata!); @@ -147,14 +144,20 @@ namespace VoidCat.Controllers var meta = await _metadata.Get(gid); if (meta == default) return UploadResult.Error("File not found"); - var editSecret = Request.Headers.GetHeader("V-EditSecret"); - var digest = Request.Headers.GetHeader("V-Digest"); - var vf = await _storage.Ingress(new(Request.Body, meta) + // Parse V-Segment header + var (segment, totalSegments) = ParseSegmentsHeader(); + + // sanity check for append operations + if (segment <= 1 || totalSegments <= 1) + { + return UploadResult.Error("Malformed request, segment must be > 1 for append"); + } + + var editSecret = Request.Headers.GetHeader("V-EditSecret"); + var vf = await _storage.Ingress(new(Request.Body, meta, segment, totalSegments) { - Hash = digest, EditSecret = editSecret?.FromBase58Guid() ?? Guid.Empty, - Id = gid, - IsAppend = true + Id = gid }, HttpContext.RequestAborted); // update file size @@ -288,6 +291,24 @@ namespace VoidCat.Controllers await _metadata.Update(gid, fileMeta); return Ok(); } + + private (int Segment, int TotalSegments) ParseSegmentsHeader() + { + // Parse V-Segment header + int segment = 1, totalSegments = 1; + var segmentHeader = Request.Headers.GetHeader("V-Segment"); + if (!string.IsNullOrEmpty(segmentHeader)) + { + var split = segmentHeader.Split("/"); + if (split.Length == 2 && int.TryParse(split[0], out var a) && int.TryParse(split[1], out var b)) + { + segment = a; + totalSegments = b; + } + } + + return (segment, totalSegments); + } } [AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)] diff --git a/VoidCat/Model/IngressPayload.cs b/VoidCat/Model/IngressPayload.cs index 67a9021..14671a7 100644 --- a/VoidCat/Model/IngressPayload.cs +++ b/VoidCat/Model/IngressPayload.cs @@ -1,10 +1,11 @@ namespace VoidCat.Model; -public sealed record IngressPayload(Stream InStream, SecretVoidFileMeta Meta) +public sealed record IngressPayload(Stream InStream, SecretVoidFileMeta Meta, int Segment, int TotalSegments) { public Guid Id { get; init; } = Guid.NewGuid(); public Guid? EditSecret { get; init; } - public string? Hash { get; init; } - - public bool IsAppend { get; init; } + + public bool IsAppend => Segment > 1 && IsMultipart; + + public bool IsMultipart => TotalSegments > 1; } \ No newline at end of file diff --git a/VoidCat/Services/Files/FileStorageStartup.cs b/VoidCat/Services/Files/FileStorageStartup.cs index 8d36c3c..347e2b1 100644 --- a/VoidCat/Services/Files/FileStorageStartup.cs +++ b/VoidCat/Services/Files/FileStorageStartup.cs @@ -17,8 +17,10 @@ public static class FileStorageStartup foreach (var s3 in settings.CloudStorage.S3 ?? Array.Empty()) { services.AddTransient((svc) => - new S3FileStore(s3, svc.GetRequiredService(), - svc.GetRequiredService())); + new S3FileStore(s3, + svc.GetRequiredService(), + svc.GetRequiredService(), + svc.GetRequiredService())); if (settings.MetadataStore == s3.Name) { diff --git a/VoidCat/Services/Files/S3FileStore.cs b/VoidCat/Services/Files/S3FileStore.cs index cab33ab..17b1ed8 100644 --- a/VoidCat/Services/Files/S3FileStore.cs +++ b/VoidCat/Services/Files/S3FileStore.cs @@ -1,4 +1,5 @@ -using Amazon.S3; +using System.Net; +using Amazon.S3; using Amazon.S3.Model; using VoidCat.Model; using VoidCat.Services.Abstractions; @@ -12,10 +13,12 @@ public class S3FileStore : StreamFileStore, IFileStore private readonly AmazonS3Client _client; private readonly S3BlobConfig _config; private readonly IAggregateStatsCollector _statsCollector; + private readonly ICache _cache; - public S3FileStore(S3BlobConfig settings, IAggregateStatsCollector stats, IFileInfoManager fileInfo) : base(stats) + public S3FileStore(S3BlobConfig settings, IAggregateStatsCollector stats, IFileInfoManager fileInfo, ICache cache) : base(stats) { _fileInfo = fileInfo; + _cache = cache; _statsCollector = stats; _config = settings; _client = _config.CreateClient(); @@ -27,23 +30,18 @@ public class S3FileStore : StreamFileStore, IFileStore /// public async ValueTask Ingress(IngressPayload payload, CancellationToken cts) { - if (payload.IsAppend) throw new InvalidOperationException("Cannot append to S3 store"); + if (payload.IsMultipart) return await IngressMultipart(payload, cts); var req = new PutObjectRequest { BucketName = _config.BucketName, Key = payload.Id.ToString(), InputStream = payload.InStream, - ContentType = payload.Meta.MimeType ?? "application/octet-stream", + ContentType = "application/octet-stream", AutoResetStreamPosition = false, AutoCloseStream = false, ChecksumAlgorithm = ChecksumAlgorithm.SHA256, - ChecksumSHA256 = payload.Hash != default ? Convert.ToBase64String(payload.Hash!.FromHex()) : null, - StreamTransferProgress = (s, e) => - { - _statsCollector.TrackIngress(payload.Id, (ulong)e.IncrementTransferred) - .GetAwaiter().GetResult(); - }, + ChecksumSHA256 = payload.Meta.Digest != default ? Convert.ToBase64String(payload.Meta.Digest!.FromHex()) : null, Headers = { ContentLength = (long)payload.Meta.Size @@ -51,6 +49,7 @@ public class S3FileStore : StreamFileStore, IFileStore }; await _client.PutObjectAsync(req, cts); + await _statsCollector.TrackIngress(payload.Id, payload.Meta.Size); return HandleCompletedUpload(payload, payload.Meta.Size); } @@ -62,18 +61,24 @@ public class S3FileStore : StreamFileStore, IFileStore } /// - public ValueTask StartEgress(EgressRequest request) + public async ValueTask StartEgress(EgressRequest request) { - if (!_config.Direct) return ValueTask.FromResult(new EgressResult()); + if (!_config.Direct) return new(); + var meta = await _fileInfo.Get(request.Id); var url = _client.GetPreSignedURL(new() { BucketName = _config.BucketName, Expires = DateTime.UtcNow.AddHours(1), - Key = request.Id.ToString() + Key = request.Id.ToString(), + ResponseHeaderOverrides = new() + { + ContentDisposition = $"inline; filename=\"{meta?.Metadata?.Name}\"", + ContentType = meta?.Metadata?.MimeType + } }); - return ValueTask.FromResult(new EgressResult(new Uri(url))); + return new(new Uri(url)); } public async ValueTask> ListFiles(PagedRequest request) @@ -155,4 +160,87 @@ public class S3FileStore : StreamFileStore, IFileStore var obj = await _client.GetObjectAsync(req, cts); return obj.ResponseStream; } + + private async Task IngressMultipart(IngressPayload payload, CancellationToken cts) + { + string? uploadId; + var cacheKey = $"s3:{_config.Name}:multipart-upload-id:{payload.Id}"; + var partsCacheKey = $"s3:{_config.Name}:multipart-upload:{payload.Id}"; + + if (payload.Segment == 1) + { + var mStart = new InitiateMultipartUploadRequest() + { + BucketName = _config.BucketName, + Key = payload.Id.ToString(), + ContentType = "application/octet-stream", + ChecksumAlgorithm = ChecksumAlgorithm.SHA256 + }; + + var mStartResult = await _client.InitiateMultipartUploadAsync(mStart, cts); + uploadId = mStartResult.UploadId; + await _cache.Set(cacheKey, uploadId, TimeSpan.FromHours(1)); + } + else + { + uploadId = await _cache.Get(cacheKey); + } + + // sadly it seems like we need a tmp file here + var tmpFile = Path.GetTempFileName(); + await using var fsTmp = new FileStream(tmpFile, FileMode.Create, FileAccess.ReadWrite); + await payload.InStream.CopyToAsync(fsTmp, cts); + fsTmp.Seek(0, SeekOrigin.Begin); + + var segmentLength = (ulong)fsTmp.Length; + var mbody = new UploadPartRequest() + { + UploadId = uploadId, + BucketName = _config.BucketName, + PartNumber = payload.Segment, + Key = payload.Id.ToString(), + InputStream = fsTmp + }; + + var bodyResponse = await _client.UploadPartAsync(mbody, cts); + if (bodyResponse.HttpStatusCode != HttpStatusCode.OK) + { + await _client.AbortMultipartUploadAsync(new() + { + BucketName = _config.BucketName, + UploadId = uploadId + }, cts); + + throw new Exception("Upload aborted"); + } + + await _statsCollector.TrackIngress(payload.Id, segmentLength); + await _cache.AddToList(partsCacheKey, $"{payload.Segment}|{bodyResponse.ETag.Replace("\"", string.Empty)}"); + if (payload.Segment == payload.TotalSegments) + { + var parts = await _cache.GetList(partsCacheKey); + var completeResponse = await _client.CompleteMultipartUploadAsync(new() + { + BucketName = _config.BucketName, + Key = payload.Id.ToString(), + UploadId = uploadId, + PartETags = parts.Select(a => + { + var pSplit = a.Split('|'); + return new PartETag() + { + PartNumber = int.Parse(pSplit[0]), + ETag = pSplit[1] + }; + }).ToList() + }, cts); + + if (completeResponse.HttpStatusCode != HttpStatusCode.OK) + { + throw new Exception("Upload failed"); + } + } + + return HandleCompletedUpload(payload, segmentLength); + } } diff --git a/VoidCat/spa/src/FileUpload.js b/VoidCat/spa/src/FileUpload.js index 4dff93e..88004a3 100644 --- a/VoidCat/spa/src/FileUpload.js +++ b/VoidCat/spa/src/FileUpload.js @@ -87,11 +87,11 @@ export function FileUpload(props) { * @param id {string} * @param editSecret {string?} * @param fullDigest {string?} Full file hash + * @param part {int?} Segment number + * @param partOf {int?} Total number of segments * @returns {Promise} */ - async function xhrSegment(segment, id, editSecret, fullDigest) { - setUState(UploadState.Hashing); - const digest = await crypto.subtle.digest(DigestAlgo, segment); + async function xhrSegment(segment, id, editSecret, fullDigest, part, partOf) { setUState(UploadState.Uploading); return await new Promise((resolve, reject) => { @@ -114,10 +114,10 @@ export function FileUpload(props) { req.upload.onprogress = handleProgress; req.open("POST", typeof (id) === "string" ? `${ApiHost}/upload/${id}` : `${ApiHost}/upload`); req.setRequestHeader("Content-Type", "application/octet-stream"); - req.setRequestHeader("V-Content-Type", props.file.type); + req.setRequestHeader("V-Content-Type", props.file.type.length === 0 ? "application/octet-stream" : props.file.type); req.setRequestHeader("V-Filename", props.file.name); - req.setRequestHeader("V-Digest", buf2hex(digest)); req.setRequestHeader("V-Full-Digest", fullDigest); + req.setRequestHeader("V-Segment", `${part}/${partOf}`) if (auth) { req.setRequestHeader("Authorization", `Bearer ${auth}`); } @@ -136,14 +136,16 @@ export function FileUpload(props) { // upload file in segments of 50MB const UploadSize = 50_000_000; + setUState(UploadState.Hashing); let digest = await crypto.subtle.digest(DigestAlgo, await props.file.arrayBuffer()); let xhr = null; - const segments = props.file.size / UploadSize; + const segments = Math.ceil(props.file.size / UploadSize); for (let s = 0; s < segments; s++) { + calc.ResetLastLoaded(); let offset = s * UploadSize; let slice = props.file.slice(offset, offset + UploadSize, props.file.type); let segment = await slice.arrayBuffer(); - xhr = await xhrSegment(segment, xhr?.file?.id, xhr?.file?.metadata?.editSecret, buf2hex(digest)); + xhr = await xhrSegment(segment, xhr?.file?.id, xhr?.file?.metadata?.editSecret, buf2hex(digest), s + 1, segments); if (!xhr.ok) { break; } diff --git a/VoidCat/spa/src/RateCalculator.js b/VoidCat/spa/src/RateCalculator.js index 69effb0..b4a8495 100644 --- a/VoidCat/spa/src/RateCalculator.js +++ b/VoidCat/spa/src/RateCalculator.js @@ -4,6 +4,10 @@ export class RateCalculator { this.lastLoaded = 0; } + ResetLastLoaded() { + this.lastLoaded = 0; + } + ReportProgress(amount) { this.reports.push({ time: new Date().getTime(),