Kick stream when low balance

This commit is contained in:
2023-07-05 12:00:28 +01:00
parent 00c918b1f6
commit 103ba9615f
3 changed files with 145 additions and 4 deletions

View File

@ -48,6 +48,7 @@ internal static class Program
// streaming services // streaming services
services.AddTransient<StreamManager>(); services.AddTransient<StreamManager>();
services.AddTransient<SrsApi>();
// lnd services // lnd services
services.AddSingleton<LndNode>(); services.AddSingleton<LndNode>();

View File

@ -0,0 +1,130 @@
using Newtonsoft.Json;
namespace NostrStreamer.Services;
public class SrsApi
{
private readonly HttpClient _client;
public SrsApi(HttpClient client, Config config)
{
_client = client;
_client.BaseAddress = config.SrsApiHost;
}
public async Task<List<Stream>> ListStreams()
{
var rsp = await _client.GetFromJsonAsync<StreamsResponse>("/api/v1/streams");
return rsp!.Streams;
}
public async Task KickClient(string clientId)
{
await _client.SendAsync(new HttpRequestMessage(HttpMethod.Delete, $"/api/v1/clients/{clientId}"));
}
}
public class Audio
{
[JsonProperty("codec")]
public string Codec { get; set; }
[JsonProperty("sample_rate")]
public int? SampleRate { get; set; }
[JsonProperty("channel")]
public int? Channel { get; set; }
[JsonProperty("profile")]
public string Profile { get; set; }
}
public class Kbps
{
[JsonProperty("recv_30s")]
public int? Recv30s { get; set; }
[JsonProperty("send_30s")]
public int? Send30s { get; set; }
}
public class Publish
{
[JsonProperty("active")]
public bool? Active { get; set; }
[JsonProperty("cid")]
public string Cid { get; set; }
}
public class StreamsResponse
{
[JsonProperty("code")]
public int? Code { get; set; }
[JsonProperty("server")]
public string Server { get; set; }
[JsonProperty("streams")]
public List<Stream> Streams { get; set; }
}
public class Stream
{
[JsonProperty("id")]
public string Id { get; set; }
[JsonProperty("name")]
public string Name { get; set; }
[JsonProperty("vhost")]
public string Vhost { get; set; }
[JsonProperty("app")]
public string App { get; set; }
[JsonProperty("live_ms")]
public object LiveMs { get; set; }
[JsonProperty("clients")]
public int? Clients { get; set; }
[JsonProperty("frames")]
public int? Frames { get; set; }
[JsonProperty("send_bytes")]
public int? SendBytes { get; set; }
[JsonProperty("recv_bytes")]
public long? RecvBytes { get; set; }
[JsonProperty("kbps")]
public Kbps Kbps { get; set; }
[JsonProperty("publish")]
public Publish Publish { get; set; }
[JsonProperty("video")]
public Video Video { get; set; }
[JsonProperty("audio")]
public Audio Audio { get; set; }
}
public class Video
{
[JsonProperty("codec")]
public string Codec { get; set; }
[JsonProperty("profile")]
public string Profile { get; set; }
[JsonProperty("level")]
public string Level { get; set; }
[JsonProperty("width")]
public int? Width { get; set; }
[JsonProperty("height")]
public int? Height { get; set; }
}

View File

@ -15,13 +15,15 @@ public class StreamManager
private readonly StreamerContext _db; private readonly StreamerContext _db;
private readonly Config _config; private readonly Config _config;
private readonly INostrClient _nostr; private readonly INostrClient _nostr;
private readonly SrsApi _srsApi;
public StreamManager(ILogger<StreamManager> logger, StreamerContext db, Config config, INostrClient nostr) public StreamManager(ILogger<StreamManager> logger, StreamerContext db, Config config, INostrClient nostr, SrsApi srsApi)
{ {
_logger = logger; _logger = logger;
_db = db; _db = db;
_config = config; _config = config;
_nostr = nostr; _nostr = nostr;
_srsApi = srsApi;
} }
public async Task StreamStarted(string streamKey) public async Task StreamStarted(string streamKey)
@ -56,8 +58,8 @@ public class StreamManager
var user = await GetUserFromStreamKey(streamKey); var user = await GetUserFromStreamKey(streamKey);
if (user == default) throw new Exception("No stream key found"); if (user == default) throw new Exception("No stream key found");
const double rate = 21.0d; const double rate = 21.0d; // 21 sats/min
var cost = Math.Round(duration / 60d * rate); var cost = Math.Round(rate * (duration / 60d));
await _db.Users await _db.Users
.Where(a => a.PubKey == user.PubKey) .Where(a => a.PubKey == user.PubKey)
.ExecuteUpdateAsync(o => o.SetProperty(v => v.Balance, v => v.Balance - cost)); .ExecuteUpdateAsync(o => o.SetProperty(v => v.Balance, v => v.Balance - cost));
@ -65,7 +67,15 @@ public class StreamManager
_logger.LogInformation("Stream consumed {n} seconds for {pubkey} costing {cost} sats", duration, user.PubKey, cost); _logger.LogInformation("Stream consumed {n} seconds for {pubkey} costing {cost} sats", duration, user.PubKey, cost);
if (user.Balance <= 0) if (user.Balance <= 0)
{ {
throw new Exception("User balance empty"); _logger.LogInformation("Kicking stream due to low balance");
var streams = await _srsApi.ListStreams();
var stream = streams.FirstOrDefault(a => a.Name == streamKey);
if (stream == default)
{
throw new Exception("Stream not found, cannot kick");
}
await _srsApi.KickClient(stream.Publish.Cid);
} }
} }