Better multi-part upload support

This commit is contained in:
Kieran 2022-07-26 13:32:36 +01:00
parent ad59bdd96c
commit 330359e291
Signed by: Kieran
GPG Key ID: DE71CEB3925BE941
6 changed files with 157 additions and 39 deletions

View File

@ -91,11 +91,8 @@ namespace VoidCat.Controllers
Storage = store Storage = store
}; };
var digest = Request.Headers.GetHeader("V-Digest"); var (segment, totalSegments) = ParseSegmentsHeader();
var vf = await _storage.Ingress(new(Request.Body, meta) var vf = await _storage.Ingress(new(Request.Body, meta, segment, totalSegments), HttpContext.RequestAborted);
{
Hash = digest
}, HttpContext.RequestAborted);
// save metadata // save metadata
await _metadata.Set(vf.Id, vf.Metadata!); await _metadata.Set(vf.Id, vf.Metadata!);
@ -147,14 +144,20 @@ namespace VoidCat.Controllers
var meta = await _metadata.Get<SecretVoidFileMeta>(gid); var meta = await _metadata.Get<SecretVoidFileMeta>(gid);
if (meta == default) return UploadResult.Error("File not found"); if (meta == default) return UploadResult.Error("File not found");
var editSecret = Request.Headers.GetHeader("V-EditSecret"); // Parse V-Segment header
var digest = Request.Headers.GetHeader("V-Digest"); var (segment, totalSegments) = ParseSegmentsHeader();
var vf = await _storage.Ingress(new(Request.Body, meta)
// sanity check for append operations
if (segment <= 1 || totalSegments <= 1)
{
return UploadResult.Error("Malformed request, segment must be > 1 for append");
}
var editSecret = Request.Headers.GetHeader("V-EditSecret");
var vf = await _storage.Ingress(new(Request.Body, meta, segment, totalSegments)
{ {
Hash = digest,
EditSecret = editSecret?.FromBase58Guid() ?? Guid.Empty, EditSecret = editSecret?.FromBase58Guid() ?? Guid.Empty,
Id = gid, Id = gid
IsAppend = true
}, HttpContext.RequestAborted); }, HttpContext.RequestAborted);
// update file size // update file size
@ -288,6 +291,24 @@ namespace VoidCat.Controllers
await _metadata.Update(gid, fileMeta); await _metadata.Update(gid, fileMeta);
return Ok(); return Ok();
} }
private (int Segment, int TotalSegments) ParseSegmentsHeader()
{
// Parse V-Segment header
int segment = 1, totalSegments = 1;
var segmentHeader = Request.Headers.GetHeader("V-Segment");
if (!string.IsNullOrEmpty(segmentHeader))
{
var split = segmentHeader.Split("/");
if (split.Length == 2 && int.TryParse(split[0], out var a) && int.TryParse(split[1], out var b))
{
segment = a;
totalSegments = b;
}
}
return (segment, totalSegments);
}
} }
[AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)] [AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)]

View File

@ -1,10 +1,11 @@
namespace VoidCat.Model; namespace VoidCat.Model;
public sealed record IngressPayload(Stream InStream, SecretVoidFileMeta Meta) public sealed record IngressPayload(Stream InStream, SecretVoidFileMeta Meta, int Segment, int TotalSegments)
{ {
public Guid Id { get; init; } = Guid.NewGuid(); public Guid Id { get; init; } = Guid.NewGuid();
public Guid? EditSecret { get; init; } public Guid? EditSecret { get; init; }
public string? Hash { get; init; }
public bool IsAppend { get; init; } public bool IsAppend => Segment > 1 && IsMultipart;
public bool IsMultipart => TotalSegments > 1;
} }

View File

@ -17,8 +17,10 @@ public static class FileStorageStartup
foreach (var s3 in settings.CloudStorage.S3 ?? Array.Empty<S3BlobConfig>()) foreach (var s3 in settings.CloudStorage.S3 ?? Array.Empty<S3BlobConfig>())
{ {
services.AddTransient<IFileStore>((svc) => services.AddTransient<IFileStore>((svc) =>
new S3FileStore(s3, svc.GetRequiredService<IAggregateStatsCollector>(), new S3FileStore(s3,
svc.GetRequiredService<IFileInfoManager>())); svc.GetRequiredService<IAggregateStatsCollector>(),
svc.GetRequiredService<IFileInfoManager>(),
svc.GetRequiredService<ICache>()));
if (settings.MetadataStore == s3.Name) if (settings.MetadataStore == s3.Name)
{ {

View File

@ -1,4 +1,5 @@
using Amazon.S3; using System.Net;
using Amazon.S3;
using Amazon.S3.Model; using Amazon.S3.Model;
using VoidCat.Model; using VoidCat.Model;
using VoidCat.Services.Abstractions; using VoidCat.Services.Abstractions;
@ -12,10 +13,12 @@ public class S3FileStore : StreamFileStore, IFileStore
private readonly AmazonS3Client _client; private readonly AmazonS3Client _client;
private readonly S3BlobConfig _config; private readonly S3BlobConfig _config;
private readonly IAggregateStatsCollector _statsCollector; private readonly IAggregateStatsCollector _statsCollector;
private readonly ICache _cache;
public S3FileStore(S3BlobConfig settings, IAggregateStatsCollector stats, IFileInfoManager fileInfo) : base(stats) public S3FileStore(S3BlobConfig settings, IAggregateStatsCollector stats, IFileInfoManager fileInfo, ICache cache) : base(stats)
{ {
_fileInfo = fileInfo; _fileInfo = fileInfo;
_cache = cache;
_statsCollector = stats; _statsCollector = stats;
_config = settings; _config = settings;
_client = _config.CreateClient(); _client = _config.CreateClient();
@ -27,23 +30,18 @@ public class S3FileStore : StreamFileStore, IFileStore
/// <inheritdoc /> /// <inheritdoc />
public async ValueTask<PrivateVoidFile> Ingress(IngressPayload payload, CancellationToken cts) public async ValueTask<PrivateVoidFile> Ingress(IngressPayload payload, CancellationToken cts)
{ {
if (payload.IsAppend) throw new InvalidOperationException("Cannot append to S3 store"); if (payload.IsMultipart) return await IngressMultipart(payload, cts);
var req = new PutObjectRequest var req = new PutObjectRequest
{ {
BucketName = _config.BucketName, BucketName = _config.BucketName,
Key = payload.Id.ToString(), Key = payload.Id.ToString(),
InputStream = payload.InStream, InputStream = payload.InStream,
ContentType = payload.Meta.MimeType ?? "application/octet-stream", ContentType = "application/octet-stream",
AutoResetStreamPosition = false, AutoResetStreamPosition = false,
AutoCloseStream = false, AutoCloseStream = false,
ChecksumAlgorithm = ChecksumAlgorithm.SHA256, ChecksumAlgorithm = ChecksumAlgorithm.SHA256,
ChecksumSHA256 = payload.Hash != default ? Convert.ToBase64String(payload.Hash!.FromHex()) : null, ChecksumSHA256 = payload.Meta.Digest != default ? Convert.ToBase64String(payload.Meta.Digest!.FromHex()) : null,
StreamTransferProgress = (s, e) =>
{
_statsCollector.TrackIngress(payload.Id, (ulong)e.IncrementTransferred)
.GetAwaiter().GetResult();
},
Headers = Headers =
{ {
ContentLength = (long)payload.Meta.Size ContentLength = (long)payload.Meta.Size
@ -51,6 +49,7 @@ public class S3FileStore : StreamFileStore, IFileStore
}; };
await _client.PutObjectAsync(req, cts); await _client.PutObjectAsync(req, cts);
await _statsCollector.TrackIngress(payload.Id, payload.Meta.Size);
return HandleCompletedUpload(payload, payload.Meta.Size); return HandleCompletedUpload(payload, payload.Meta.Size);
} }
@ -62,18 +61,24 @@ public class S3FileStore : StreamFileStore, IFileStore
} }
/// <inheritdoc /> /// <inheritdoc />
public ValueTask<EgressResult> StartEgress(EgressRequest request) public async ValueTask<EgressResult> StartEgress(EgressRequest request)
{ {
if (!_config.Direct) return ValueTask.FromResult(new EgressResult()); if (!_config.Direct) return new();
var meta = await _fileInfo.Get(request.Id);
var url = _client.GetPreSignedURL(new() var url = _client.GetPreSignedURL(new()
{ {
BucketName = _config.BucketName, BucketName = _config.BucketName,
Expires = DateTime.UtcNow.AddHours(1), Expires = DateTime.UtcNow.AddHours(1),
Key = request.Id.ToString() Key = request.Id.ToString(),
ResponseHeaderOverrides = new()
{
ContentDisposition = $"inline; filename=\"{meta?.Metadata?.Name}\"",
ContentType = meta?.Metadata?.MimeType
}
}); });
return ValueTask.FromResult(new EgressResult(new Uri(url))); return new(new Uri(url));
} }
public async ValueTask<PagedResult<PublicVoidFile>> ListFiles(PagedRequest request) public async ValueTask<PagedResult<PublicVoidFile>> ListFiles(PagedRequest request)
@ -155,4 +160,87 @@ public class S3FileStore : StreamFileStore, IFileStore
var obj = await _client.GetObjectAsync(req, cts); var obj = await _client.GetObjectAsync(req, cts);
return obj.ResponseStream; return obj.ResponseStream;
} }
private async Task<PrivateVoidFile> IngressMultipart(IngressPayload payload, CancellationToken cts)
{
string? uploadId;
var cacheKey = $"s3:{_config.Name}:multipart-upload-id:{payload.Id}";
var partsCacheKey = $"s3:{_config.Name}:multipart-upload:{payload.Id}";
if (payload.Segment == 1)
{
var mStart = new InitiateMultipartUploadRequest()
{
BucketName = _config.BucketName,
Key = payload.Id.ToString(),
ContentType = "application/octet-stream",
ChecksumAlgorithm = ChecksumAlgorithm.SHA256
};
var mStartResult = await _client.InitiateMultipartUploadAsync(mStart, cts);
uploadId = mStartResult.UploadId;
await _cache.Set(cacheKey, uploadId, TimeSpan.FromHours(1));
}
else
{
uploadId = await _cache.Get<string>(cacheKey);
}
// sadly it seems like we need a tmp file here
var tmpFile = Path.GetTempFileName();
await using var fsTmp = new FileStream(tmpFile, FileMode.Create, FileAccess.ReadWrite);
await payload.InStream.CopyToAsync(fsTmp, cts);
fsTmp.Seek(0, SeekOrigin.Begin);
var segmentLength = (ulong)fsTmp.Length;
var mbody = new UploadPartRequest()
{
UploadId = uploadId,
BucketName = _config.BucketName,
PartNumber = payload.Segment,
Key = payload.Id.ToString(),
InputStream = fsTmp
};
var bodyResponse = await _client.UploadPartAsync(mbody, cts);
if (bodyResponse.HttpStatusCode != HttpStatusCode.OK)
{
await _client.AbortMultipartUploadAsync(new()
{
BucketName = _config.BucketName,
UploadId = uploadId
}, cts);
throw new Exception("Upload aborted");
}
await _statsCollector.TrackIngress(payload.Id, segmentLength);
await _cache.AddToList(partsCacheKey, $"{payload.Segment}|{bodyResponse.ETag.Replace("\"", string.Empty)}");
if (payload.Segment == payload.TotalSegments)
{
var parts = await _cache.GetList(partsCacheKey);
var completeResponse = await _client.CompleteMultipartUploadAsync(new()
{
BucketName = _config.BucketName,
Key = payload.Id.ToString(),
UploadId = uploadId,
PartETags = parts.Select(a =>
{
var pSplit = a.Split('|');
return new PartETag()
{
PartNumber = int.Parse(pSplit[0]),
ETag = pSplit[1]
};
}).ToList()
}, cts);
if (completeResponse.HttpStatusCode != HttpStatusCode.OK)
{
throw new Exception("Upload failed");
}
}
return HandleCompletedUpload(payload, segmentLength);
}
} }

View File

@ -87,11 +87,11 @@ export function FileUpload(props) {
* @param id {string} * @param id {string}
* @param editSecret {string?} * @param editSecret {string?}
* @param fullDigest {string?} Full file hash * @param fullDigest {string?} Full file hash
* @param part {int?} Segment number
* @param partOf {int?} Total number of segments
* @returns {Promise<any>} * @returns {Promise<any>}
*/ */
async function xhrSegment(segment, id, editSecret, fullDigest) { async function xhrSegment(segment, id, editSecret, fullDigest, part, partOf) {
setUState(UploadState.Hashing);
const digest = await crypto.subtle.digest(DigestAlgo, segment);
setUState(UploadState.Uploading); setUState(UploadState.Uploading);
return await new Promise((resolve, reject) => { return await new Promise((resolve, reject) => {
@ -114,10 +114,10 @@ export function FileUpload(props) {
req.upload.onprogress = handleProgress; req.upload.onprogress = handleProgress;
req.open("POST", typeof (id) === "string" ? `${ApiHost}/upload/${id}` : `${ApiHost}/upload`); req.open("POST", typeof (id) === "string" ? `${ApiHost}/upload/${id}` : `${ApiHost}/upload`);
req.setRequestHeader("Content-Type", "application/octet-stream"); req.setRequestHeader("Content-Type", "application/octet-stream");
req.setRequestHeader("V-Content-Type", props.file.type); req.setRequestHeader("V-Content-Type", props.file.type.length === 0 ? "application/octet-stream" : props.file.type);
req.setRequestHeader("V-Filename", props.file.name); req.setRequestHeader("V-Filename", props.file.name);
req.setRequestHeader("V-Digest", buf2hex(digest));
req.setRequestHeader("V-Full-Digest", fullDigest); req.setRequestHeader("V-Full-Digest", fullDigest);
req.setRequestHeader("V-Segment", `${part}/${partOf}`)
if (auth) { if (auth) {
req.setRequestHeader("Authorization", `Bearer ${auth}`); req.setRequestHeader("Authorization", `Bearer ${auth}`);
} }
@ -136,14 +136,16 @@ export function FileUpload(props) {
// upload file in segments of 50MB // upload file in segments of 50MB
const UploadSize = 50_000_000; const UploadSize = 50_000_000;
setUState(UploadState.Hashing);
let digest = await crypto.subtle.digest(DigestAlgo, await props.file.arrayBuffer()); let digest = await crypto.subtle.digest(DigestAlgo, await props.file.arrayBuffer());
let xhr = null; let xhr = null;
const segments = props.file.size / UploadSize; const segments = Math.ceil(props.file.size / UploadSize);
for (let s = 0; s < segments; s++) { for (let s = 0; s < segments; s++) {
calc.ResetLastLoaded();
let offset = s * UploadSize; let offset = s * UploadSize;
let slice = props.file.slice(offset, offset + UploadSize, props.file.type); let slice = props.file.slice(offset, offset + UploadSize, props.file.type);
let segment = await slice.arrayBuffer(); let segment = await slice.arrayBuffer();
xhr = await xhrSegment(segment, xhr?.file?.id, xhr?.file?.metadata?.editSecret, buf2hex(digest)); xhr = await xhrSegment(segment, xhr?.file?.id, xhr?.file?.metadata?.editSecret, buf2hex(digest), s + 1, segments);
if (!xhr.ok) { if (!xhr.ok) {
break; break;
} }

View File

@ -4,6 +4,10 @@ export class RateCalculator {
this.lastLoaded = 0; this.lastLoaded = 0;
} }
ResetLastLoaded() {
this.lastLoaded = 0;
}
ReportProgress(amount) { ReportProgress(amount) {
this.reports.push({ this.reports.push({
time: new Date().getTime(), time: new Date().getTime(),