2022-03-01 11:32:41 +00:00
|
|
|
|
using System.Buffers;
|
|
|
|
|
using VoidCat.Model;
|
|
|
|
|
using VoidCat.Model.Exceptions;
|
|
|
|
|
using VoidCat.Services.Abstractions;
|
|
|
|
|
|
|
|
|
|
namespace VoidCat.Services.Files;
|
|
|
|
|
|
2022-06-08 16:17:53 +00:00
|
|
|
|
/// <summary>
|
|
|
|
|
/// File store based on <see cref="Stream"/> objects
|
|
|
|
|
/// </summary>
|
2022-03-01 11:32:41 +00:00
|
|
|
|
public abstract class StreamFileStore
|
|
|
|
|
{
|
|
|
|
|
private const int BufferSize = 1_048_576;
|
|
|
|
|
private readonly IAggregateStatsCollector _stats;
|
|
|
|
|
|
2022-06-08 16:17:53 +00:00
|
|
|
|
protected StreamFileStore(IAggregateStatsCollector stats)
|
2022-03-01 11:32:41 +00:00
|
|
|
|
{
|
|
|
|
|
_stats = stats;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected async ValueTask EgressFromStream(Stream stream, EgressRequest request, Stream outStream,
|
|
|
|
|
CancellationToken cts)
|
|
|
|
|
{
|
|
|
|
|
if (request.Ranges.Any() && stream.CanSeek)
|
|
|
|
|
{
|
|
|
|
|
await EgressRanges(request.Id, request.Ranges, stream, outStream, cts);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
await EgressFull(request.Id, stream, outStream, cts);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2023-05-09 13:56:57 +00:00
|
|
|
|
protected async ValueTask<Database.File> IngressToStream(Stream outStream, IngressPayload payload,
|
2022-03-01 11:32:41 +00:00
|
|
|
|
CancellationToken cts)
|
|
|
|
|
{
|
|
|
|
|
var id = payload.Id;
|
|
|
|
|
var meta = payload.Meta;
|
|
|
|
|
if (payload.IsAppend)
|
|
|
|
|
{
|
|
|
|
|
if (meta?.EditSecret != null && meta.EditSecret != payload.EditSecret)
|
|
|
|
|
{
|
|
|
|
|
throw new VoidNotAllowedException("Edit secret incorrect!");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2022-06-13 18:07:14 +00:00
|
|
|
|
var total = await IngressInternal(id, payload.InStream, outStream, cts);
|
|
|
|
|
return HandleCompletedUpload(payload, total);
|
2022-03-01 16:48:42 +00:00
|
|
|
|
}
|
|
|
|
|
|
2023-05-09 13:56:57 +00:00
|
|
|
|
protected Database.File HandleCompletedUpload(IngressPayload payload, ulong totalSize)
|
2022-03-01 16:48:42 +00:00
|
|
|
|
{
|
|
|
|
|
var meta = payload.Meta;
|
2022-03-01 11:32:41 +00:00
|
|
|
|
if (payload.IsAppend)
|
|
|
|
|
{
|
2023-01-26 13:49:38 +00:00
|
|
|
|
meta = meta with
|
2022-03-01 11:32:41 +00:00
|
|
|
|
{
|
2023-05-09 13:56:57 +00:00
|
|
|
|
Id = payload.Id,
|
2022-03-01 16:48:42 +00:00
|
|
|
|
Size = meta.Size + totalSize
|
2022-03-01 11:32:41 +00:00
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
2023-01-26 13:49:38 +00:00
|
|
|
|
meta = meta with
|
2022-03-01 11:32:41 +00:00
|
|
|
|
{
|
2023-05-09 13:56:57 +00:00
|
|
|
|
Id = payload.Id,
|
|
|
|
|
Uploaded = DateTime.UtcNow,
|
2022-03-01 11:32:41 +00:00
|
|
|
|
EditSecret = Guid.NewGuid(),
|
2022-03-01 16:48:42 +00:00
|
|
|
|
Size = totalSize
|
2022-03-01 11:32:41 +00:00
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
2023-05-09 13:56:57 +00:00
|
|
|
|
return meta;
|
2022-03-01 11:32:41 +00:00
|
|
|
|
}
|
2022-06-08 16:17:53 +00:00
|
|
|
|
|
2022-06-13 18:07:14 +00:00
|
|
|
|
private async Task<ulong> IngressInternal(Guid id, Stream ingress, Stream outStream,
|
2022-06-08 16:17:53 +00:00
|
|
|
|
CancellationToken cts)
|
2022-03-01 11:32:41 +00:00
|
|
|
|
{
|
|
|
|
|
using var buffer = MemoryPool<byte>.Shared.Rent(BufferSize);
|
|
|
|
|
var total = 0UL;
|
2022-06-13 18:07:14 +00:00
|
|
|
|
int readLength, offset = 0;
|
2022-03-01 11:32:41 +00:00
|
|
|
|
while ((readLength = await ingress.ReadAsync(buffer.Memory[offset..], cts)) > 0 || offset != 0)
|
|
|
|
|
{
|
|
|
|
|
if (readLength != 0 && offset + readLength < buffer.Memory.Length)
|
|
|
|
|
{
|
|
|
|
|
// read until buffer full
|
|
|
|
|
offset += readLength;
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var totalRead = readLength + offset;
|
|
|
|
|
var buf = buffer.Memory[..totalRead];
|
2022-03-01 16:48:42 +00:00
|
|
|
|
await outStream.WriteAsync(buf, cts);
|
2022-03-01 11:32:41 +00:00
|
|
|
|
await _stats.TrackIngress(id, (ulong) buf.Length);
|
|
|
|
|
total += (ulong) buf.Length;
|
|
|
|
|
offset = 0;
|
|
|
|
|
}
|
|
|
|
|
|
2022-06-13 18:07:14 +00:00
|
|
|
|
return total;
|
2022-03-01 11:32:41 +00:00
|
|
|
|
}
|
|
|
|
|
|
2022-03-01 16:48:42 +00:00
|
|
|
|
protected async Task EgressFull(Guid id, Stream inStream, Stream outStream,
|
2022-03-01 11:32:41 +00:00
|
|
|
|
CancellationToken cts)
|
|
|
|
|
{
|
|
|
|
|
using var buffer = MemoryPool<byte>.Shared.Rent(BufferSize);
|
|
|
|
|
int readLength = 0, offset = 0;
|
|
|
|
|
while ((readLength = await inStream.ReadAsync(buffer.Memory[offset..], cts)) > 0 || offset != 0)
|
|
|
|
|
{
|
|
|
|
|
if (readLength != 0 && offset + readLength < buffer.Memory.Length)
|
|
|
|
|
{
|
|
|
|
|
// read until buffer full
|
|
|
|
|
offset += readLength;
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var fullSize = readLength + offset;
|
|
|
|
|
await outStream.WriteAsync(buffer.Memory[..fullSize], cts);
|
|
|
|
|
await _stats.TrackEgress(id, (ulong) fullSize);
|
|
|
|
|
await outStream.FlushAsync(cts);
|
|
|
|
|
offset = 0;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private async Task EgressRanges(Guid id, IEnumerable<RangeRequest> ranges, Stream inStream, Stream outStream,
|
|
|
|
|
CancellationToken cts)
|
|
|
|
|
{
|
|
|
|
|
using var buffer = MemoryPool<byte>.Shared.Rent(BufferSize);
|
|
|
|
|
foreach (var range in ranges)
|
|
|
|
|
{
|
|
|
|
|
inStream.Seek(range.Start ?? range.End ?? 0L,
|
|
|
|
|
range.Start.HasValue ? SeekOrigin.Begin : SeekOrigin.End);
|
|
|
|
|
|
|
|
|
|
int readLength = 0, offset = 0;
|
|
|
|
|
var dataRemaining = range.Size ?? 0L;
|
|
|
|
|
while ((readLength = await inStream.ReadAsync(buffer.Memory[offset..], cts)) > 0 || offset != 0)
|
|
|
|
|
{
|
|
|
|
|
if (readLength != 0 && offset + readLength < buffer.Memory.Length)
|
|
|
|
|
{
|
|
|
|
|
// read until buffer full
|
|
|
|
|
offset += readLength;
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var fullSize = readLength + offset;
|
|
|
|
|
var toWrite = Math.Min(fullSize, dataRemaining);
|
|
|
|
|
await outStream.WriteAsync(buffer.Memory[..(int) toWrite], cts);
|
|
|
|
|
await _stats.TrackEgress(id, (ulong) toWrite);
|
|
|
|
|
await outStream.FlushAsync(cts);
|
|
|
|
|
dataRemaining -= toWrite;
|
|
|
|
|
offset = 0;
|
|
|
|
|
|
|
|
|
|
if (dataRemaining == 0)
|
|
|
|
|
{
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|