From dfeb4d41de3e24a9de18586ea18b5f88476d8080 Mon Sep 17 00:00:00 2001 From: Kieran Date: Tue, 1 Mar 2022 11:32:41 +0000 Subject: [PATCH] Extract base stream filestore --- VoidCat/Model/IngressPayload.cs | 4 +- .../Services/Files/LocalDiskFileStorage.cs | 160 +--------------- VoidCat/Services/Files/StreamFileStore.cs | 174 ++++++++++++++++++ 3 files changed, 182 insertions(+), 156 deletions(-) create mode 100644 VoidCat/Services/Files/StreamFileStore.cs diff --git a/VoidCat/Model/IngressPayload.cs b/VoidCat/Model/IngressPayload.cs index 998b8cc..0399577 100644 --- a/VoidCat/Model/IngressPayload.cs +++ b/VoidCat/Model/IngressPayload.cs @@ -2,9 +2,9 @@ namespace VoidCat.Model; public sealed record IngressPayload(Stream InStream, SecretVoidFileMeta Meta) { - public Guid? Id { get; init; } + public Guid Id { get; init; } = Guid.NewGuid(); public Guid? EditSecret { get; init; } public string? Hash { get; init; } - public bool IsAppend => Id.HasValue && EditSecret.HasValue; + public bool IsAppend => EditSecret.HasValue; } \ No newline at end of file diff --git a/VoidCat/Services/Files/LocalDiskFileStorage.cs b/VoidCat/Services/Files/LocalDiskFileStorage.cs index 9c03941..c8b6f60 100644 --- a/VoidCat/Services/Files/LocalDiskFileStorage.cs +++ b/VoidCat/Services/Files/LocalDiskFileStorage.cs @@ -1,29 +1,23 @@ -using System.Buffers; -using System.Security.Cryptography; using VoidCat.Model; using VoidCat.Model.Exceptions; using VoidCat.Services.Abstractions; namespace VoidCat.Services.Files; -public class LocalDiskFileStore : IFileStore +public class LocalDiskFileStore : StreamFileStore, IFileStore { - private const int BufferSize = 1_048_576; private readonly ILogger _logger; private readonly VoidSettings _settings; - private readonly IAggregateStatsCollector _stats; private readonly IFileMetadataStore _metadataStore; private readonly IFileInfoManager _fileInfo; - private readonly IUserUploadsStore _userUploads; public LocalDiskFileStore(ILogger logger, VoidSettings settings, IAggregateStatsCollector stats, IFileMetadataStore metadataStore, IFileInfoManager fileInfo, IUserUploadsStore userUploads) + : base(stats, metadataStore, userUploads) { _settings = settings; - _stats = stats; _metadataStore = metadataStore; _fileInfo = fileInfo; - _userUploads = userUploads; _logger = logger; if (!Directory.Exists(_settings.DataDirectory)) @@ -38,71 +32,15 @@ public class LocalDiskFileStore : IFileStore if (!File.Exists(path)) throw new VoidFileNotFoundException(request.Id); await using var fs = new FileStream(path, FileMode.Open, FileAccess.Read); - if (request.Ranges.Any()) - { - await EgressRanges(request.Id, request.Ranges, fs, outStream, cts); - } - else - { - await EgressFull(request.Id, fs, outStream, cts); - } + await EgressFromStream(fs, request, outStream, cts); } public async ValueTask Ingress(IngressPayload payload, CancellationToken cts) { - var id = payload.Id ?? Guid.NewGuid(); - var fPath = MapPath(id); - var meta = payload.Meta; - if (payload.IsAppend) - { - if (meta?.EditSecret != null && meta.EditSecret != payload.EditSecret) - { - throw new VoidNotAllowedException("Edit secret incorrect!"); - } - } - - // open file + var fPath = MapPath(payload.Id); await using var fsTemp = new FileStream(fPath, payload.IsAppend ? FileMode.Append : FileMode.Create, FileAccess.Write); - - var (total, hash) = await IngressInternal(id, payload.InStream, fsTemp, cts); - - if (payload.Hash != null && !hash.Equals(payload.Hash, StringComparison.InvariantCultureIgnoreCase)) - { - throw new CryptographicException("Invalid file hash"); - } - - if (payload.IsAppend) - { - meta = meta! with - { - Size = meta.Size + total - }; - } - else - { - meta = meta! with - { - Digest = hash, - Uploaded = DateTimeOffset.UtcNow, - EditSecret = Guid.NewGuid(), - Size = total - }; - } - - await _metadataStore.Set(id, meta); - var vf = new PrivateVoidFile() - { - Id = id, - Metadata = meta - }; - - if (meta.Uploader.HasValue) - { - await _userUploads.AddFile(meta.Uploader.Value, vf); - } - - return vf; + return await IngressToStream(fsTemp, payload, cts); } public ValueTask> ListFiles(PagedRequest request) @@ -160,92 +98,6 @@ public class LocalDiskFileStore : IFileStore await _metadataStore.Delete(id); } - private async Task<(ulong, string)> IngressInternal(Guid id, Stream ingress, Stream fs, CancellationToken cts) - { - using var buffer = MemoryPool.Shared.Rent(BufferSize); - var total = 0UL; - int readLength = 0, offset = 0; - var sha = SHA256.Create(); - while ((readLength = await ingress.ReadAsync(buffer.Memory[offset..], cts)) > 0 || offset != 0) - { - if (readLength != 0 && offset + readLength < buffer.Memory.Length) - { - // read until buffer full - offset += readLength; - continue; - } - - var totalRead = readLength + offset; - var buf = buffer.Memory[..totalRead]; - await fs.WriteAsync(buf, cts); - await _stats.TrackIngress(id, (ulong)buf.Length); - sha.TransformBlock(buf.ToArray(), 0, buf.Length, null, 0); - total += (ulong)buf.Length; - offset = 0; - } - - sha.TransformFinalBlock(Array.Empty(), 0, 0); - return (total, BitConverter.ToString(sha.Hash!).Replace("-", string.Empty)); - } - - private async Task EgressFull(Guid id, FileStream fileStream, Stream outStream, - CancellationToken cts) - { - using var buffer = MemoryPool.Shared.Rent(BufferSize); - int readLength = 0, offset = 0; - while ((readLength = await fileStream.ReadAsync(buffer.Memory[offset..], cts)) > 0 || offset != 0) - { - if (readLength != 0 && offset + readLength < buffer.Memory.Length) - { - // read until buffer full - offset += readLength; - continue; - } - - var fullSize = readLength + offset; - await outStream.WriteAsync(buffer.Memory[..fullSize], cts); - await _stats.TrackEgress(id, (ulong)fullSize); - await outStream.FlushAsync(cts); - offset = 0; - } - } - - private async Task EgressRanges(Guid id, IEnumerable ranges, FileStream fileStream, Stream outStream, - CancellationToken cts) - { - using var buffer = MemoryPool.Shared.Rent(BufferSize); - foreach (var range in ranges) - { - fileStream.Seek(range.Start ?? range.End ?? 0L, - range.Start.HasValue ? SeekOrigin.Begin : SeekOrigin.End); - - int readLength = 0, offset = 0; - var dataRemaining = range.Size ?? 0L; - while ((readLength = await fileStream.ReadAsync(buffer.Memory[offset..], cts)) > 0 || offset != 0) - { - if (readLength != 0 && offset + readLength < buffer.Memory.Length) - { - // read until buffer full - offset += readLength; - continue; - } - - var fullSize = readLength + offset; - var toWrite = Math.Min(fullSize, dataRemaining); - await outStream.WriteAsync(buffer.Memory[..(int)toWrite], cts); - await _stats.TrackEgress(id, (ulong)toWrite); - await outStream.FlushAsync(cts); - dataRemaining -= toWrite; - offset = 0; - - if (dataRemaining == 0) - { - break; - } - } - } - } - private string MapPath(Guid id) => Path.Join(_settings.DataDirectory, id.ToString()); -} +} \ No newline at end of file diff --git a/VoidCat/Services/Files/StreamFileStore.cs b/VoidCat/Services/Files/StreamFileStore.cs new file mode 100644 index 0000000..a6badf9 --- /dev/null +++ b/VoidCat/Services/Files/StreamFileStore.cs @@ -0,0 +1,174 @@ +using System.Buffers; +using System.Security.Cryptography; +using VoidCat.Model; +using VoidCat.Model.Exceptions; +using VoidCat.Services.Abstractions; + +namespace VoidCat.Services.Files; + +public abstract class StreamFileStore +{ + private const int BufferSize = 1_048_576; + private readonly IAggregateStatsCollector _stats; + private readonly IFileMetadataStore _metadataStore; + private readonly IUserUploadsStore _userUploads; + + protected StreamFileStore(IAggregateStatsCollector stats, IFileMetadataStore metadataStore, + IUserUploadsStore userUploads) + { + _stats = stats; + _metadataStore = metadataStore; + _userUploads = userUploads; + } + + protected async ValueTask EgressFromStream(Stream stream, EgressRequest request, Stream outStream, + CancellationToken cts) + { + if (request.Ranges.Any() && stream.CanSeek) + { + await EgressRanges(request.Id, request.Ranges, stream, outStream, cts); + } + else + { + await EgressFull(request.Id, stream, outStream, cts); + } + } + + protected async ValueTask IngressToStream(Stream stream, IngressPayload payload, + CancellationToken cts) + { + var id = payload.Id; + var meta = payload.Meta; + if (payload.IsAppend) + { + if (meta?.EditSecret != null && meta.EditSecret != payload.EditSecret) + { + throw new VoidNotAllowedException("Edit secret incorrect!"); + } + } + + var (total, hash) = await IngressInternal(id, payload.InStream, stream, cts); + if (payload.Hash != null && !hash.Equals(payload.Hash, StringComparison.InvariantCultureIgnoreCase)) + { + throw new CryptographicException("Invalid file hash"); + } + + if (payload.IsAppend) + { + meta = meta! with + { + Size = meta.Size + total + }; + } + else + { + meta = meta! with + { + Digest = hash, + Uploaded = DateTimeOffset.UtcNow, + EditSecret = Guid.NewGuid(), + Size = total + }; + } + + await _metadataStore.Set(id, meta); + var vf = new PrivateVoidFile() + { + Id = id, + Metadata = meta + }; + + if (meta.Uploader.HasValue) + { + await _userUploads.AddFile(meta.Uploader.Value, vf); + } + + return vf; + } + + private async Task<(ulong, string)> IngressInternal(Guid id, Stream ingress, Stream fs, CancellationToken cts) + { + using var buffer = MemoryPool.Shared.Rent(BufferSize); + var total = 0UL; + int readLength = 0, offset = 0; + var sha = SHA256.Create(); + while ((readLength = await ingress.ReadAsync(buffer.Memory[offset..], cts)) > 0 || offset != 0) + { + if (readLength != 0 && offset + readLength < buffer.Memory.Length) + { + // read until buffer full + offset += readLength; + continue; + } + + var totalRead = readLength + offset; + var buf = buffer.Memory[..totalRead]; + await fs.WriteAsync(buf, cts); + await _stats.TrackIngress(id, (ulong) buf.Length); + sha.TransformBlock(buf.ToArray(), 0, buf.Length, null, 0); + total += (ulong) buf.Length; + offset = 0; + } + + sha.TransformFinalBlock(Array.Empty(), 0, 0); + return (total, BitConverter.ToString(sha.Hash!).Replace("-", string.Empty)); + } + + private async Task EgressFull(Guid id, Stream inStream, Stream outStream, + CancellationToken cts) + { + using var buffer = MemoryPool.Shared.Rent(BufferSize); + int readLength = 0, offset = 0; + while ((readLength = await inStream.ReadAsync(buffer.Memory[offset..], cts)) > 0 || offset != 0) + { + if (readLength != 0 && offset + readLength < buffer.Memory.Length) + { + // read until buffer full + offset += readLength; + continue; + } + + var fullSize = readLength + offset; + await outStream.WriteAsync(buffer.Memory[..fullSize], cts); + await _stats.TrackEgress(id, (ulong) fullSize); + await outStream.FlushAsync(cts); + offset = 0; + } + } + + private async Task EgressRanges(Guid id, IEnumerable ranges, Stream inStream, Stream outStream, + CancellationToken cts) + { + using var buffer = MemoryPool.Shared.Rent(BufferSize); + foreach (var range in ranges) + { + inStream.Seek(range.Start ?? range.End ?? 0L, + range.Start.HasValue ? SeekOrigin.Begin : SeekOrigin.End); + + int readLength = 0, offset = 0; + var dataRemaining = range.Size ?? 0L; + while ((readLength = await inStream.ReadAsync(buffer.Memory[offset..], cts)) > 0 || offset != 0) + { + if (readLength != 0 && offset + readLength < buffer.Memory.Length) + { + // read until buffer full + offset += readLength; + continue; + } + + var fullSize = readLength + offset; + var toWrite = Math.Min(fullSize, dataRemaining); + await outStream.WriteAsync(buffer.Memory[..(int) toWrite], cts); + await _stats.TrackEgress(id, (ulong) toWrite); + await outStream.FlushAsync(cts); + dataRemaining -= toWrite; + offset = 0; + + if (dataRemaining == 0) + { + break; + } + } + } + } +} \ No newline at end of file