Always abort multipart upload on exception

This commit is contained in:
Kieran 2022-07-27 19:41:34 +01:00
parent 5e3ceebf45
commit d7984db5bb
Signed by: Kieran
GPG Key ID: DE71CEB3925BE941
2 changed files with 86 additions and 75 deletions

View File

@ -19,8 +19,7 @@ public static class Extensions
RegionEndpoint = !string.IsNullOrEmpty(c.Region) ? RegionEndpoint.GetBySystemName(c.Region) : null, RegionEndpoint = !string.IsNullOrEmpty(c.Region) ? RegionEndpoint.GetBySystemName(c.Region) : null,
ServiceURL = c.ServiceUrl?.ToString(), ServiceURL = c.ServiceUrl?.ToString(),
UseHttp = c.ServiceUrl?.Scheme == "http", UseHttp = c.ServiceUrl?.Scheme == "http",
ForcePathStyle = true, ForcePathStyle = true
SignatureVersion = "4"
}); });
} }

View File

@ -165,87 +165,99 @@ public class S3FileStore : StreamFileStore, IFileStore
private async Task<PrivateVoidFile> IngressMultipart(IngressPayload payload, CancellationToken cts) private async Task<PrivateVoidFile> IngressMultipart(IngressPayload payload, CancellationToken cts)
{ {
string? uploadId; string? uploadId = null;
var cacheKey = $"s3:{_config.Name}:multipart-upload-id:{payload.Id}"; var cacheKey = $"s3:{_config.Name}:multipart-upload-id:{payload.Id}";
var partsCacheKey = $"s3:{_config.Name}:multipart-upload:{payload.Id}"; var partsCacheKey = $"s3:{_config.Name}:multipart-upload:{payload.Id}";
if (payload.Segment == 1) try
{ {
var mStart = new InitiateMultipartUploadRequest() if (payload.Segment == 1)
{ {
var mStart = new InitiateMultipartUploadRequest()
{
BucketName = _config.BucketName,
Key = payload.Id.ToString(),
ContentType = "application/octet-stream",
ChecksumAlgorithm = _config.SendChecksum ? ChecksumAlgorithm.SHA256 : null
};
var mStartResult = await _client.InitiateMultipartUploadAsync(mStart, cts);
uploadId = mStartResult.UploadId;
await _cache.Set(cacheKey, uploadId, TimeSpan.FromHours(1));
}
else
{
uploadId = await _cache.Get<string>(cacheKey);
}
// sadly it seems like we need a tmp file here
var tmpFile = Path.GetTempFileName();
await using var fsTmp = new FileStream(tmpFile, FileMode.Create, FileAccess.ReadWrite);
await payload.InStream.CopyToAsync(fsTmp, cts);
fsTmp.Seek(0, SeekOrigin.Begin);
var segmentLength = (ulong)fsTmp.Length;
var mBody = new UploadPartRequest()
{
UploadId = uploadId,
BucketName = _config.BucketName, BucketName = _config.BucketName,
PartNumber = payload.Segment,
Key = payload.Id.ToString(), Key = payload.Id.ToString(),
ContentType = "application/octet-stream", InputStream = fsTmp,
ChecksumAlgorithm = _config.SendChecksum ? ChecksumAlgorithm.SHA256 : null DisablePayloadSigning = _config.DisablePayloadSigning
}; };
var mStartResult = await _client.InitiateMultipartUploadAsync(mStart, cts); var bodyResponse = await _client.UploadPartAsync(mBody, cts);
uploadId = mStartResult.UploadId; if (bodyResponse.HttpStatusCode != HttpStatusCode.OK)
await _cache.Set(cacheKey, uploadId, TimeSpan.FromHours(1));
}
else
{
uploadId = await _cache.Get<string>(cacheKey);
}
// sadly it seems like we need a tmp file here
var tmpFile = Path.GetTempFileName();
await using var fsTmp = new FileStream(tmpFile, FileMode.Create, FileAccess.ReadWrite);
await payload.InStream.CopyToAsync(fsTmp, cts);
fsTmp.Seek(0, SeekOrigin.Begin);
var segmentLength = (ulong)fsTmp.Length;
var mBody = new UploadPartRequest()
{
UploadId = uploadId,
BucketName = _config.BucketName,
PartNumber = payload.Segment,
Key = payload.Id.ToString(),
InputStream = fsTmp,
DisablePayloadSigning = _config.DisablePayloadSigning
};
var bodyResponse = await _client.UploadPartAsync(mBody, cts);
if (bodyResponse.HttpStatusCode != HttpStatusCode.OK)
{
await _client.AbortMultipartUploadAsync(new()
{ {
BucketName = _config.BucketName, throw new Exception("Upload aborted");
UploadId = uploadId
}, cts);
throw new Exception("Upload aborted");
}
await _statsCollector.TrackIngress(payload.Id, segmentLength);
await _cache.AddToList(partsCacheKey, $"{payload.Segment}|{bodyResponse.ETag.Replace("\"", string.Empty)}");
if (payload.Segment == payload.TotalSegments)
{
var parts = await _cache.GetList(partsCacheKey);
var completeResponse = await _client.CompleteMultipartUploadAsync(new()
{
BucketName = _config.BucketName,
Key = payload.Id.ToString(),
UploadId = uploadId,
ChecksumSHA256 = payload.Meta.Digest != default && _config.SendChecksum ?
Convert.ToBase64String(payload.Meta.Digest!.FromHex()) : null,
PartETags = parts.Select(a =>
{
var pSplit = a.Split('|');
return new PartETag()
{
PartNumber = int.Parse(pSplit[0]),
ETag = pSplit[1]
};
}).ToList()
}, cts);
if (completeResponse.HttpStatusCode != HttpStatusCode.OK)
{
throw new Exception("Upload failed");
} }
}
return HandleCompletedUpload(payload, segmentLength); await _statsCollector.TrackIngress(payload.Id, segmentLength);
await _cache.AddToList(partsCacheKey, $"{payload.Segment}|{bodyResponse.ETag.Replace("\"", string.Empty)}");
if (payload.Segment == payload.TotalSegments)
{
var parts = await _cache.GetList(partsCacheKey);
var completeResponse = await _client.CompleteMultipartUploadAsync(new()
{
BucketName = _config.BucketName,
Key = payload.Id.ToString(),
UploadId = uploadId,
ChecksumSHA256 = payload.Meta.Digest != default && _config.SendChecksum ?
Convert.ToBase64String(payload.Meta.Digest!.FromHex()) : null,
PartETags = parts.Select(a =>
{
var pSplit = a.Split('|');
return new PartETag()
{
PartNumber = int.Parse(pSplit[0]),
ETag = pSplit[1]
};
}).ToList()
}, cts);
if (completeResponse.HttpStatusCode != HttpStatusCode.OK)
{
throw new Exception("Upload failed");
}
}
return HandleCompletedUpload(payload, segmentLength);
}
catch
{
if (uploadId != null)
{
await _client.AbortMultipartUploadAsync(new()
{
Key = payload.Id.ToString(),
BucketName = _config.BucketName,
UploadId = uploadId
}, cts);
}
throw;
}
} }
} }