Persist stats, redis

This commit is contained in:
Kieran 2022-02-16 23:19:31 +00:00
parent 575ab74b14
commit f47ca6cb5e
Signed by: Kieran
GPG Key ID: DE71CEB3925BE941
13 changed files with 193 additions and 55 deletions

View File

@ -47,6 +47,11 @@ public class DownloadController : Controller
else if (egressReq.Ranges.Count() == 1)
{
Response.StatusCode = (int)HttpStatusCode.PartialContent;
if (egressReq.Ranges.Sum(a => a.Size) == 0)
{
Response.StatusCode = (int)HttpStatusCode.RequestedRangeNotSatisfiable;
return;
}
}
else
{

View File

@ -1,4 +1,5 @@
using Microsoft.AspNetCore.Mvc;
using Prometheus;
using VoidCat.Model;
using VoidCat.Services;
using VoidCat.Services.Abstractions;
@ -8,19 +9,19 @@ namespace VoidCat.Controllers
[Route("stats")]
public class StatsController : Controller
{
private readonly IStatsCollector _statsCollector;
private readonly IStatsReporter _statsReporter;
private readonly IFileStore _fileStore;
public StatsController(IStatsCollector statsCollector, IFileStore fileStore)
public StatsController(IStatsReporter statsReporter, IFileStore fileStore)
{
_statsCollector = statsCollector;
_statsReporter = statsReporter;
_fileStore = fileStore;
}
[HttpGet]
public async Task<GlobalStats> GetGlobalStats()
{
var bw = await _statsCollector.GetBandwidth();
var bw = await _statsReporter.GetBandwidth();
var bytes = 0UL;
await foreach (var vf in _fileStore.ListFiles())
{
@ -33,7 +34,7 @@ namespace VoidCat.Controllers
[Route("{id}")]
public async Task<FileStats> GetFileStats([FromRoute] string id)
{
var bw = await _statsCollector.GetBandwidth(id.FromBase58Guid());
var bw = await _statsReporter.GetBandwidth(id.FromBase58Guid());
return new(bw);
}
}

View File

@ -7,6 +7,8 @@
public TorSettings? TorSettings { get; init; }
public JwtSettings JwtSettings { get; init; } = new("void_cat_internal", "default_key");
public string? Redis { get; init; }
}
public sealed record TorSettings(Uri TorControl, string PrivateKey, string ControlPassword);

View File

@ -2,6 +2,7 @@ using System.Text;
using Microsoft.AspNetCore.Authentication.JwtBearer;
using Microsoft.IdentityModel.Tokens;
using Prometheus;
using StackExchange.Redis;
using VoidCat.Model;
using VoidCat.Services;
using VoidCat.Services.Abstractions;
@ -16,6 +17,14 @@ services.AddSingleton(voidSettings);
var seqSettings = configuration.GetSection("Seq");
builder.Logging.AddSeq(seqSettings);
var useRedis = !string.IsNullOrEmpty(voidSettings.Redis);
if (useRedis)
{
var cx = await ConnectionMultiplexer.ConnectAsync(voidSettings.Redis);
services.AddSingleton(cx);
services.AddSingleton(cx.GetDatabase());
}
services.AddRouting();
services.AddControllers().AddNewtonsoftJson();
services.AddAuthentication(JwtBearerDefaults.AuthenticationScheme)
@ -32,11 +41,23 @@ services.AddAuthentication(JwtBearerDefaults.AuthenticationScheme)
};
});
services.AddMemoryCache();
services.AddScoped<IFileMetadataStore, LocalDiskFileMetadataStore>();
services.AddScoped<IFileStore, LocalDiskFileStore>();
services.AddScoped<IAggregateStatsCollector, AggregateStatsCollector>();
services.AddScoped<IStatsCollector, PrometheusStatsCollector>();
if (useRedis)
{
services.AddScoped<RedisStatsController>();
services.AddScoped<IStatsCollector>(svc => svc.GetRequiredService<RedisStatsController>());
services.AddScoped<IStatsReporter>(svc => svc.GetRequiredService<RedisStatsController>());
}
else
{
services.AddMemoryCache();
services.AddScoped<InMemoryStatsController>();
services.AddScoped<IStatsReporter>(svc => svc.GetRequiredService<InMemoryStatsController>());
services.AddScoped<IStatsCollector>(svc => svc.GetRequiredService<InMemoryStatsController>());
}
var app = builder.Build();

View File

@ -1,12 +1,19 @@
namespace VoidCat.Services.Abstractions;
public interface IAggregateStatsCollector : IStatsCollector
{
}
public interface IStatsCollector
{
ValueTask TrackIngress(Guid id, ulong amount);
ValueTask TrackEgress(Guid id, ulong amount);
}
public interface IStatsReporter
{
ValueTask<Bandwidth> GetBandwidth();
ValueTask<Bandwidth> GetBandwidth(Guid id);
}
public sealed record Bandwidth(ulong Ingress, ulong Egress);
public sealed record Bandwidth(ulong Ingress, ulong Egress);

View File

@ -0,0 +1,29 @@
using VoidCat.Services.Abstractions;
namespace VoidCat.Services;
public class AggregateStatsCollector : IAggregateStatsCollector
{
private readonly IEnumerable<IStatsCollector> _collectors;
public AggregateStatsCollector(IEnumerable<IStatsCollector> collectors)
{
_collectors = collectors;
}
public async ValueTask TrackIngress(Guid id, ulong amount)
{
foreach (var collector in _collectors)
{
await collector.TrackIngress(id, amount);
}
}
public async ValueTask TrackEgress(Guid id, ulong amount)
{
foreach (var collector in _collectors)
{
await collector.TrackEgress(id, amount);
}
}
}

View File

@ -3,12 +3,12 @@ using VoidCat.Services.Abstractions;
namespace VoidCat.Services;
public class InMemoryStatsCollector : IStatsCollector
public class InMemoryStatsController : IStatsCollector, IStatsReporter
{
private static Guid _global = new Guid("{A98DFDCC-C4E1-4D42-B818-912086FC6157}");
private static readonly Guid Global = new Guid("{A98DFDCC-C4E1-4D42-B818-912086FC6157}");
private readonly IMemoryCache _cache;
public InMemoryStatsCollector(IMemoryCache cache)
public InMemoryStatsController(IMemoryCache cache)
{
_cache = cache;
}
@ -16,19 +16,19 @@ public class InMemoryStatsCollector : IStatsCollector
public ValueTask TrackIngress(Guid id, ulong amount)
{
Incr(IngressKey(id), amount);
Incr(IngressKey(_global), amount);
Incr(IngressKey(Global), amount);
return ValueTask.CompletedTask;
}
public ValueTask TrackEgress(Guid id, ulong amount)
{
Incr(EgressKey(id), amount);
Incr(EgressKey(_global), amount);
Incr(EgressKey(Global), amount);
return ValueTask.CompletedTask;
}
public ValueTask<Bandwidth> GetBandwidth()
=> ValueTask.FromResult(GetBandwidthInternal(_global));
=> ValueTask.FromResult(GetBandwidthInternal(Global));
public ValueTask<Bandwidth> GetBandwidth(Guid id)
=> ValueTask.FromResult(GetBandwidthInternal(id));
@ -49,4 +49,4 @@ public class InMemoryStatsCollector : IStatsCollector
private string IngressKey(Guid id) => $"stats:ingress:{id}";
private string EgressKey(Guid id) => $"stats:egress:{id}";
}
}

View File

@ -8,11 +8,12 @@ namespace VoidCat.Services;
public class LocalDiskFileStore : IFileStore
{
private const int BufferSize = 1024 * 1024;
private readonly VoidSettings _settings;
private readonly IStatsCollector _stats;
private readonly IAggregateStatsCollector _stats;
private readonly IFileMetadataStore _metadataStore;
public LocalDiskFileStore(VoidSettings settings, IStatsCollector stats,
public LocalDiskFileStore(VoidSettings settings, IAggregateStatsCollector stats,
IFileMetadataStore metadataStore)
{
_settings = settings;
@ -117,17 +118,26 @@ public class LocalDiskFileStore : IFileStore
private async Task<(ulong, string)> IngressInternal(Guid id, Stream ingress, Stream fs, CancellationToken cts)
{
using var buffer = MemoryPool<byte>.Shared.Rent();
using var buffer = MemoryPool<byte>.Shared.Rent(BufferSize);
var total = 0UL;
var readLength = 0;
int readLength = 0, offset = 0;
var sha = SHA256.Create();
while ((readLength = await ingress.ReadAsync(buffer.Memory, cts)) > 0)
while ((readLength = await ingress.ReadAsync(buffer.Memory[offset..], cts)) > 0 || offset != 0)
{
var buf = buffer.Memory[..readLength];
if (readLength != 0 && offset + readLength < buffer.Memory.Length)
{
// read until buffer full
offset += readLength;
continue;
}
var totalRead = readLength + offset;
var buf = buffer.Memory[..totalRead];
await fs.WriteAsync(buf, cts);
await _stats.TrackIngress(id, (ulong) readLength);
await _stats.TrackIngress(id, (ulong)buf.Length);
sha.TransformBlock(buf.ToArray(), 0, buf.Length, null, 0);
total += (ulong) readLength;
total += (ulong)buf.Length;
offset = 0;
}
sha.TransformFinalBlock(Array.Empty<byte>(), 0, 0);
@ -137,39 +147,61 @@ public class LocalDiskFileStore : IFileStore
private async Task EgressFull(Guid id, FileStream fileStream, Stream outStream,
CancellationToken cts)
{
using var buffer = MemoryPool<byte>.Shared.Rent();
var readLength = 0;
while ((readLength = await fileStream.ReadAsync(buffer.Memory, cts)) > 0)
using var buffer = MemoryPool<byte>.Shared.Rent(BufferSize);
int readLength = 0, offset = 0;
while ((readLength = await fileStream.ReadAsync(buffer.Memory[offset..], cts)) > 0 || offset != 0)
{
await outStream.WriteAsync(buffer.Memory[..readLength], cts);
await _stats.TrackEgress(id, (ulong) readLength);
if (readLength != 0 && offset + readLength < buffer.Memory.Length)
{
// read until buffer full
offset += readLength;
continue;
}
var fullSize = readLength + offset;
await outStream.WriteAsync(buffer.Memory[..fullSize], cts);
await _stats.TrackEgress(id, (ulong)fullSize);
await outStream.FlushAsync(cts);
offset = 0;
}
}
private async Task EgressRanges(Guid id, IEnumerable<RangeRequest> ranges, FileStream fileStream, Stream outStream,
CancellationToken cts)
{
using var buffer = MemoryPool<byte>.Shared.Rent();
using var buffer = MemoryPool<byte>.Shared.Rent(BufferSize);
foreach (var range in ranges)
{
fileStream.Seek(range.Start ?? range.End ?? 0L,
range.Start.HasValue ? SeekOrigin.Begin : SeekOrigin.End);
var readLength = 0;
int readLength = 0, offset = 0;
var dataRemaining = range.Size ?? 0L;
while ((readLength = await fileStream.ReadAsync(buffer.Memory, cts)) > 0
&& dataRemaining > 0)
while ((readLength = await fileStream.ReadAsync(buffer.Memory[offset..], cts)) > 0 || offset != 0)
{
var toWrite = Math.Min(readLength, dataRemaining);
await outStream.WriteAsync(buffer.Memory[..(int) toWrite], cts);
await _stats.TrackEgress(id, (ulong) toWrite);
dataRemaining -= toWrite;
if (readLength != 0 && offset + readLength < buffer.Memory.Length)
{
// read until buffer full
offset += readLength;
continue;
}
var fullSize = readLength + offset;
var toWrite = Math.Min(fullSize, dataRemaining);
await outStream.WriteAsync(buffer.Memory[..(int)toWrite], cts);
await _stats.TrackEgress(id, (ulong)toWrite);
await outStream.FlushAsync(cts);
dataRemaining -= toWrite;
offset = 0;
if (dataRemaining == 0)
{
break;
}
}
}
}
private string MapPath(Guid id) =>
Path.Join(_settings.DataDirectory, id.ToString());
}
}

View File

@ -24,15 +24,4 @@ public class PrometheusStatsCollector : IStatsCollector
_egress.WithLabels(id.ToString()).Inc(amount);
return ValueTask.CompletedTask;
}
public ValueTask<Bandwidth> GetBandwidth()
{
return ValueTask.FromResult<Bandwidth>(new((ulong) _ingress.Value, (ulong) _egress.Value));
}
public ValueTask<Bandwidth> GetBandwidth(Guid id)
{
return ValueTask.FromResult<Bandwidth>(new((ulong) _ingress.Labels(id.ToString()).Value,
(ulong) _egress.Labels(id.ToString()).Value));
}
}

View File

@ -0,0 +1,51 @@
using StackExchange.Redis;
using VoidCat.Services.Abstractions;
namespace VoidCat.Services;
public class RedisStatsController : IStatsReporter, IStatsCollector
{
private const string GlobalEgress = "stats:egress:global";
private const string GlobalIngress = "stats:ingress:global";
private readonly IDatabase _redis;
public RedisStatsController(IDatabase redis)
{
_redis = redis;
}
public async ValueTask<Bandwidth> GetBandwidth()
{
var egress = _redis.StringGetAsync(GlobalEgress);
var ingress = _redis.StringGetAsync(GlobalIngress);
await Task.WhenAll(egress, ingress);
return new((ulong)ingress.Result, (ulong)egress.Result);
}
public async ValueTask<Bandwidth> GetBandwidth(Guid id)
{
var egress = _redis.StringGetAsync(formatEgressKey(id));
var ingress = _redis.StringGetAsync(formatIngressKey(id));
await Task.WhenAll(egress, ingress);
return new((ulong)ingress.Result, (ulong)egress.Result);
}
public async ValueTask TrackIngress(Guid id, ulong amount)
{
await Task.WhenAll(
_redis.StringIncrementAsync(GlobalIngress, amount),
_redis.StringIncrementAsync(formatIngressKey(id), amount));
}
public async ValueTask TrackEgress(Guid id, ulong amount)
{
await Task.WhenAll(
_redis.StringIncrementAsync(GlobalEgress, amount),
_redis.StringIncrementAsync(formatEgressKey(id), amount));
}
private RedisKey formatIngressKey(Guid id) => $"stats:{id}:ingress";
private RedisKey formatEgressKey(Guid id) => $"stats:{id}:egress";
}

View File

@ -17,6 +17,7 @@
<PackageReference Include="NBitcoin" Version="6.0.19" />
<PackageReference Include="prometheus-net.AspNetCore" Version="5.0.2" />
<PackageReference Include="Seq.Extensions.Logging" Version="6.0.0" />
<PackageReference Include="StackExchange.Redis" Version="2.5.27-prerelease" />
</ItemGroup>
<ItemGroup>
<!-- Don't publish the SPA source files, but do show them in the project files list -->

View File

@ -146,12 +146,12 @@ export function FileUpload(props) {
function renderStatus() {
if (result) {
return (
return uState === UploadState.Done ?
<dl>
<dt>Link:</dt>
<dd><a target="_blank" href={`/${result.id}`}>{result.id}</a></dd>
</dl>
);
: <b>{result}</b>;
} else {
return (
<dl>

View File

@ -1,5 +1,5 @@
import { useEffect, useState } from "react";
import { FormatBytes } from "./Util";
import {useEffect, useState} from "react";
import {FormatBytes} from "./Util";
import "./GlobalStats.css";
@ -18,11 +18,11 @@ export function GlobalStats(props) {
return (
<div className="stats">
<div>Ingress:</div>
<div>{FormatBytes(stats?.bandwidth?.ingress ?? 0)}</div>
<div>{FormatBytes(stats?.bandwidth?.ingress ?? 0, 2)}</div>
<div>Egress:</div>
<div>{FormatBytes(stats?.bandwidth?.egress ?? 0)}</div>
<div>{FormatBytes(stats?.bandwidth?.egress ?? 0, 2)}</div>
<div>Storage:</div>
<div>{FormatBytes(stats?.totalBytes ?? 0)}</div>
<div>{FormatBytes(stats?.totalBytes ?? 0, 2)}</div>
</div>
);
}