void.cat/VoidCat/Services/Files/S3FileStore.cs

280 lines
9.6 KiB
C#
Raw Permalink Normal View History

2022-07-26 12:32:36 +00:00
using System.Net;
using Amazon.S3;
2022-03-01 16:48:42 +00:00
using Amazon.S3.Model;
using VoidCat.Model;
using VoidCat.Services.Abstractions;
namespace VoidCat.Services.Files;
/// <inheritdoc cref="VoidCat.Services.Abstractions.IFileStore" />
2022-03-01 16:48:42 +00:00
public class S3FileStore : StreamFileStore, IFileStore
{
private readonly IFileMetadataStore _fileInfo;
2022-03-01 16:48:42 +00:00
private readonly AmazonS3Client _client;
private readonly S3BlobConfig _config;
private readonly IAggregateStatsCollector _statsCollector;
2022-07-26 12:32:36 +00:00
private readonly ICache _cache;
2022-03-01 16:48:42 +00:00
public S3FileStore(S3BlobConfig settings, IAggregateStatsCollector stats, IFileMetadataStore fileInfo, ICache cache) : base(stats)
2022-03-01 16:48:42 +00:00
{
_fileInfo = fileInfo;
2022-07-26 12:32:36 +00:00
_cache = cache;
2022-03-01 16:48:42 +00:00
_statsCollector = stats;
_config = settings;
2022-03-01 16:48:42 +00:00
_client = _config.CreateClient();
}
public string Key => _config.Name;
2023-08-24 09:51:07 +00:00
public async ValueTask<bool> Exists(Guid id)
{
try
{
await _client.GetObjectMetadataAsync(new GetObjectMetadataRequest()
{
BucketName = _config.BucketName,
Key = id.ToString()
});
return true;
}
catch
{
return false;
}
}
public async ValueTask<Database.File> Ingress(IngressPayload payload, CancellationToken cts)
2022-03-01 16:48:42 +00:00
{
2022-07-26 12:32:36 +00:00
if (payload.IsMultipart) return await IngressMultipart(payload, cts);
2022-07-25 19:38:58 +00:00
2022-03-01 16:48:42 +00:00
var req = new PutObjectRequest
{
BucketName = _config.BucketName,
Key = payload.Id.ToString(),
InputStream = payload.InStream,
2022-07-26 12:32:36 +00:00
ContentType = "application/octet-stream",
2022-03-01 16:48:42 +00:00
AutoResetStreamPosition = false,
AutoCloseStream = false,
2022-07-26 13:01:14 +00:00
ChecksumAlgorithm = _config.SendChecksum ? ChecksumAlgorithm.SHA256 : null,
ChecksumSHA256 = payload.Meta.Digest != default && _config.SendChecksum ?
Convert.ToBase64String(payload.Meta.Digest!.FromHex()) : null,
2022-07-26 13:15:15 +00:00
DisablePayloadSigning = _config.DisablePayloadSigning,
2022-03-01 16:48:42 +00:00
Headers =
{
2022-07-25 19:38:58 +00:00
ContentLength = (long)payload.Meta.Size
2022-03-01 16:48:42 +00:00
}
};
2022-06-13 18:07:14 +00:00
await _client.PutObjectAsync(req, cts);
2022-07-26 12:32:36 +00:00
await _statsCollector.TrackIngress(payload.Id, payload.Meta.Size);
2022-06-13 18:07:14 +00:00
return HandleCompletedUpload(payload, payload.Meta.Size);
2022-03-01 16:48:42 +00:00
}
/// <inheritdoc />
2022-03-01 16:48:42 +00:00
public async ValueTask Egress(EgressRequest request, Stream outStream, CancellationToken cts)
{
2022-03-07 13:38:53 +00:00
await using var stream = await Open(request, cts);
await EgressFull(request.Id, stream, outStream, cts);
2022-03-01 16:48:42 +00:00
}
/// <inheritdoc />
2022-07-26 12:32:36 +00:00
public async ValueTask<EgressResult> StartEgress(EgressRequest request)
{
2022-07-26 12:32:36 +00:00
if (!_config.Direct) return new();
2022-07-26 12:32:36 +00:00
var meta = await _fileInfo.Get(request.Id);
2022-07-25 19:37:19 +00:00
var url = _client.GetPreSignedURL(new()
{
2022-07-25 19:37:19 +00:00
BucketName = _config.BucketName,
Expires = DateTime.UtcNow.AddHours(1),
2022-07-26 12:32:36 +00:00
Key = request.Id.ToString(),
ResponseHeaderOverrides = new()
{
ContentDisposition = $"inline; filename=\"{meta?.Name}\"",
ContentType = meta?.MimeType
2022-07-26 12:32:36 +00:00
}
2022-07-25 19:37:19 +00:00
});
2022-07-26 12:32:36 +00:00
return new(new Uri(url));
}
public async ValueTask<PagedResult<Database.File>> ListFiles(PagedRequest request)
2022-03-01 16:48:42 +00:00
{
try
{
var objs = await _client.ListObjectsV2Async(new ListObjectsV2Request()
{
BucketName = _config.BucketName,
});
var files = (request.SortBy, request.SortOrder) switch
{
(PagedSortBy.Date, PageSortOrder.Asc) => objs.S3Objects.OrderBy(a => a.LastModified),
(PagedSortBy.Date, PageSortOrder.Dsc) => objs.S3Objects.OrderByDescending(a => a.LastModified),
(PagedSortBy.Name, PageSortOrder.Asc) => objs.S3Objects.OrderBy(a => a.Key),
(PagedSortBy.Name, PageSortOrder.Dsc) => objs.S3Objects.OrderByDescending(a => a.Key),
(PagedSortBy.Size, PageSortOrder.Asc) => objs.S3Objects.OrderBy(a => a.Size),
(PagedSortBy.Size, PageSortOrder.Dsc) => objs.S3Objects.OrderByDescending(a => a.Size),
_ => objs.S3Objects.AsEnumerable()
};
async IAsyncEnumerable<Database.File> EnumerateFiles(IEnumerable<S3Object> page)
2022-03-01 16:48:42 +00:00
{
foreach (var item in page)
{
if (!Guid.TryParse(item.Key, out var gid)) continue;
var obj = await _fileInfo.Get(gid);
if (obj != default)
{
yield return obj;
}
}
}
return new()
{
Page = request.Page,
PageSize = request.PageSize,
TotalResults = files.Count(),
2023-08-24 10:53:12 +00:00
Data = EnumerateFiles(files.Skip(request.PageSize * request.Page).Take(request.PageSize))
2022-03-01 16:48:42 +00:00
};
}
catch (AmazonS3Exception aex)
{
// ignore
return new()
{
Page = request.Page,
PageSize = request.PageSize,
TotalResults = 0,
2023-08-24 10:53:12 +00:00
Data = AsyncEnumerable.Empty<Database.File>()
2022-03-01 16:48:42 +00:00
};
}
}
/// <inheritdoc />
2022-03-01 16:48:42 +00:00
public async ValueTask DeleteFile(Guid id)
{
await _client.DeleteObjectAsync(_config.BucketName, id.ToString());
}
2022-03-07 13:38:53 +00:00
/// <inheritdoc />
2022-03-07 13:38:53 +00:00
public async ValueTask<Stream> Open(EgressRequest request, CancellationToken cts)
{
var req = new GetObjectRequest()
{
BucketName = _config.BucketName,
Key = request.Id.ToString()
};
2022-07-25 19:38:58 +00:00
2022-03-07 13:38:53 +00:00
if (request.Ranges.Any())
{
var r = request.Ranges.First();
req.ByteRange = new ByteRange(r.OriginalString);
}
var obj = await _client.GetObjectAsync(req, cts);
return obj.ResponseStream;
}
2022-07-26 12:32:36 +00:00
private async Task<Database.File> IngressMultipart(IngressPayload payload, CancellationToken cts)
2022-07-26 12:32:36 +00:00
{
string? uploadId = null;
2022-07-26 12:32:36 +00:00
var cacheKey = $"s3:{_config.Name}:multipart-upload-id:{payload.Id}";
var partsCacheKey = $"s3:{_config.Name}:multipart-upload:{payload.Id}";
try
2022-07-26 12:32:36 +00:00
{
if (payload.Segment == 1)
2022-07-26 12:32:36 +00:00
{
var mStart = new InitiateMultipartUploadRequest()
{
BucketName = _config.BucketName,
Key = payload.Id.ToString(),
ContentType = "application/octet-stream",
ChecksumAlgorithm = _config.SendChecksum ? ChecksumAlgorithm.SHA256 : null
};
2022-07-26 12:32:36 +00:00
var mStartResult = await _client.InitiateMultipartUploadAsync(mStart, cts);
uploadId = mStartResult.UploadId;
await _cache.Set(cacheKey, uploadId, TimeSpan.FromHours(1));
}
else
2022-07-26 12:32:36 +00:00
{
uploadId = await _cache.Get<string>(cacheKey);
}
2022-07-26 12:32:36 +00:00
// sadly it seems like we need a tmp file here
var tmpFile = Path.GetTempFileName();
await using var fsTmp = new FileStream(tmpFile, FileMode.Create, FileAccess.ReadWrite);
await payload.InStream.CopyToAsync(fsTmp, cts);
fsTmp.Seek(0, SeekOrigin.Begin);
2022-07-26 12:32:36 +00:00
var segmentLength = (ulong)fsTmp.Length;
var mBody = new UploadPartRequest()
2022-07-26 12:32:36 +00:00
{
UploadId = uploadId,
2022-07-26 12:32:36 +00:00
BucketName = _config.BucketName,
PartNumber = payload.Segment,
2022-07-26 12:32:36 +00:00
Key = payload.Id.ToString(),
InputStream = fsTmp,
DisablePayloadSigning = _config.DisablePayloadSigning
};
2023-08-24 09:51:07 +00:00
var bodyResponse = await _client.UploadPartAsync(mBody, cts);
if (bodyResponse.HttpStatusCode != HttpStatusCode.OK)
{
throw new Exception("Upload aborted");
}
await _statsCollector.TrackIngress(payload.Id, segmentLength);
await _cache.AddToList(partsCacheKey, $"{payload.Segment}|{bodyResponse.ETag.Replace("\"", string.Empty)}");
if (payload.Segment == payload.TotalSegments)
{
var parts = await _cache.GetList(partsCacheKey);
var completeResponse = await _client.CompleteMultipartUploadAsync(new()
2022-07-26 12:32:36 +00:00
{
BucketName = _config.BucketName,
Key = payload.Id.ToString(),
UploadId = uploadId,
ChecksumSHA256 = payload.Meta.Digest != default && _config.SendChecksum ?
Convert.ToBase64String(payload.Meta.Digest!.FromHex()) : null,
PartETags = parts.Select(a =>
2022-07-26 12:32:36 +00:00
{
var pSplit = a.Split('|');
return new PartETag()
{
PartNumber = int.Parse(pSplit[0]),
ETag = pSplit[1]
};
}).ToList()
}, cts);
2022-07-26 12:32:36 +00:00
if (completeResponse.HttpStatusCode != HttpStatusCode.OK)
{
throw new Exception("Upload failed");
}
2022-07-26 12:32:36 +00:00
}
2023-08-24 09:51:07 +00:00
return HandleCompletedUpload(payload, segmentLength);
2022-07-26 12:32:36 +00:00
}
catch
{
if (uploadId != null)
{
await _client.AbortMultipartUploadAsync(new()
{
Key = payload.Id.ToString(),
BucketName = _config.BucketName,
UploadId = uploadId
}, cts);
}
2022-07-26 12:32:36 +00:00
throw;
}
2022-07-26 12:32:36 +00:00
}
2022-07-25 19:38:58 +00:00
}