Kick from edge forward

This commit is contained in:
2023-08-01 11:25:31 +01:00
parent 9a8aca72ab
commit 0634622c4d
20 changed files with 523 additions and 74 deletions

View File

@ -0,0 +1,24 @@
using Microsoft.AspNetCore.Mvc;
using NostrStreamer.Services.StreamManager;
namespace NostrStreamer.Controllers;
[Route("/api/admin")]
public class AdminController : Controller
{
private readonly ILogger<AdminController> _logger;
private readonly StreamManagerFactory _streamManagerFactory;
public AdminController(ILogger<AdminController> logger, StreamManagerFactory streamManagerFactory)
{
_logger = logger;
_streamManagerFactory = streamManagerFactory;
}
[HttpPatch("stream/{id:guid}")]
public async Task PublishEvent([FromRoute] Guid id)
{
var stream = await _streamManagerFactory.ForStream(id);
await stream.UpdateEvent();
}
}

View File

@ -118,7 +118,7 @@ public class PlaylistController : Controller
{ {
var streamManager = await _streamManagerFactory.ForCurrentStream(pubkey); var streamManager = await _streamManagerFactory.ForCurrentStream(pubkey);
var userStream = streamManager.GetStream(); var userStream = streamManager.GetStream();
return Redirect($"{userStream.Id}.m3u8"); return Redirect($"stream/{userStream.Id}.m3u8");
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -128,7 +128,7 @@ public class PlaylistController : Controller
return NotFound(); return NotFound();
} }
[HttpGet("{id:guid}.m3u8")] [HttpGet("stream/{id:guid}.m3u8")]
public async Task CreateMultiBitrate([FromRoute] Guid id) public async Task CreateMultiBitrate([FromRoute] Guid id)
{ {
try try

View File

@ -33,19 +33,19 @@ public class SrsController : Controller
} }
var appSplit = req.App.Split("/"); var appSplit = req.App.Split("/");
var streamManager = await _streamManagerFactory.ForStream(new StreamInfo var info = new StreamInfo
{ {
App = appSplit[0], App = appSplit[0],
Variant = appSplit.Length > 1 ? appSplit[1] : "", Variant = appSplit.Length > 1 ? appSplit[1] : "",
ClientId = req.ClientId!, ClientId = req.ClientId!,
StreamId = req.StreamId ?? req.ClientId!,
StreamKey = req.Stream, StreamKey = req.Stream,
EdgeIp = req.Ip! EdgeIp = req.Ip!
}); };
if (req.Action == "on_forward") if (req.Action == "on_forward")
{ {
var urls = await streamManager.OnForward(); var newStream = await _streamManagerFactory.CreateStream(info);
var urls = await newStream.OnForward();
if (urls.Count > 0) if (urls.Count > 0)
{ {
return new SrsForwardHookReply return new SrsForwardHookReply
@ -63,6 +63,7 @@ public class SrsController : Controller
}; };
} }
var streamManager = await _streamManagerFactory.ForStream(info);
if (req.App.EndsWith("/source")) if (req.App.EndsWith("/source"))
{ {
if (req.Action == "on_publish") if (req.Action == "on_publish")

View File

@ -24,6 +24,12 @@ public class UserStreamConfiguration : IEntityTypeConfiguration<UserStream>
builder.Property(a => a.Recording); builder.Property(a => a.Recording);
builder.Property(a => a.EdgeIp)
.IsRequired();
builder.Property(a => a.ForwardClientId)
.IsRequired();
builder.HasOne(a => a.Endpoint) builder.HasOne(a => a.Endpoint)
.WithMany() .WithMany()
.HasForeignKey(a => a.EndpointId); .HasForeignKey(a => a.EndpointId);

View File

@ -28,6 +28,16 @@ public class UserStream
public Guid EndpointId { get; init; } public Guid EndpointId { get; init; }
public IngestEndpoint Endpoint { get; init; } = null!; public IngestEndpoint Endpoint { get; init; } = null!;
/// <summary>
/// Publisher edge IP
/// </summary>
public string EdgeIp { get; set; } = null!;
/// <summary>
/// Publisher edge client id
/// </summary>
public string ForwardClientId { get; set; } = null!;
public List<UserStreamGuest> Guests { get; init; } = new(); public List<UserStreamGuest> Guests { get; init; } = new();
public List<UserStreamRecording> Recordings { get; init; } = new(); public List<UserStreamRecording> Recordings { get; init; } = new();
} }

View File

@ -0,0 +1,312 @@
// <auto-generated />
using System;
using System.Collections.Generic;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Migrations;
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
using NostrStreamer.Database;
using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata;
#nullable disable
namespace NostrStreamer.Migrations
{
[DbContext(typeof(StreamerContext))]
[Migration("20230731145446_ForwardClientDetails")]
partial class ForwardClientDetails
{
/// <inheritdoc />
protected override void BuildTargetModel(ModelBuilder modelBuilder)
{
#pragma warning disable 612, 618
modelBuilder
.HasAnnotation("ProductVersion", "7.0.8")
.HasAnnotation("Relational:MaxIdentifierLength", 63);
NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder);
modelBuilder.Entity("NostrStreamer.Database.IngestEndpoint", b =>
{
b.Property<Guid>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("uuid");
b.Property<string>("App")
.IsRequired()
.HasColumnType("text");
b.Property<List<string>>("Capabilities")
.IsRequired()
.HasColumnType("text[]");
b.Property<int>("Cost")
.HasColumnType("integer");
b.Property<string>("Forward")
.IsRequired()
.HasColumnType("text");
b.Property<string>("Name")
.IsRequired()
.HasColumnType("text");
b.HasKey("Id");
b.HasIndex("App")
.IsUnique();
b.ToTable("Endpoints");
});
modelBuilder.Entity("NostrStreamer.Database.Payment", b =>
{
b.Property<string>("PaymentHash")
.HasColumnType("text");
b.Property<decimal>("Amount")
.HasColumnType("numeric(20,0)");
b.Property<DateTime>("Created")
.HasColumnType("timestamp with time zone");
b.Property<string>("Invoice")
.IsRequired()
.HasColumnType("text");
b.Property<bool>("IsPaid")
.HasColumnType("boolean");
b.Property<string>("Nostr")
.HasColumnType("text");
b.Property<string>("PubKey")
.IsRequired()
.HasColumnType("text");
b.Property<int>("Type")
.HasColumnType("integer");
b.HasKey("PaymentHash");
b.HasIndex("PubKey");
b.ToTable("Payments");
});
modelBuilder.Entity("NostrStreamer.Database.User", b =>
{
b.Property<string>("PubKey")
.HasColumnType("text");
b.Property<long>("Balance")
.HasColumnType("bigint");
b.Property<string>("ContentWarning")
.HasColumnType("text");
b.Property<string>("Image")
.HasColumnType("text");
b.Property<string>("StreamKey")
.IsRequired()
.HasColumnType("text");
b.Property<string>("Summary")
.HasColumnType("text");
b.Property<string>("Tags")
.HasColumnType("text");
b.Property<string>("Title")
.HasColumnType("text");
b.Property<uint>("Version")
.IsConcurrencyToken()
.ValueGeneratedOnAddOrUpdate()
.HasColumnType("xid")
.HasColumnName("xmin");
b.HasKey("PubKey");
b.ToTable("Users");
});
modelBuilder.Entity("NostrStreamer.Database.UserStream", b =>
{
b.Property<Guid>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("uuid");
b.Property<string>("EdgeIp")
.IsRequired()
.HasColumnType("text");
b.Property<Guid>("EndpointId")
.HasColumnType("uuid");
b.Property<DateTime?>("Ends")
.HasColumnType("timestamp with time zone");
b.Property<string>("Event")
.IsRequired()
.HasColumnType("text");
b.Property<string>("ForwardClientId")
.IsRequired()
.HasColumnType("text");
b.Property<string>("PubKey")
.IsRequired()
.HasColumnType("text");
b.Property<string>("Recording")
.HasColumnType("text");
b.Property<DateTime>("Starts")
.HasColumnType("timestamp with time zone");
b.Property<int>("State")
.HasColumnType("integer");
b.Property<string>("StreamId")
.IsRequired()
.HasColumnType("text");
b.HasKey("Id");
b.HasIndex("EndpointId");
b.HasIndex("PubKey");
b.ToTable("Streams");
});
modelBuilder.Entity("NostrStreamer.Database.UserStreamGuest", b =>
{
b.Property<Guid>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("uuid");
b.Property<string>("PubKey")
.IsRequired()
.HasColumnType("text");
b.Property<string>("Relay")
.HasColumnType("text");
b.Property<string>("Role")
.HasColumnType("text");
b.Property<string>("Sig")
.HasColumnType("text");
b.Property<Guid>("StreamId")
.HasColumnType("uuid");
b.Property<decimal>("ZapSplit")
.HasColumnType("numeric");
b.HasKey("Id");
b.HasIndex("StreamId", "PubKey")
.IsUnique();
b.ToTable("Guests");
});
modelBuilder.Entity("NostrStreamer.Database.UserStreamRecording", b =>
{
b.Property<Guid>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("uuid");
b.Property<double>("Duration")
.HasColumnType("double precision");
b.Property<DateTime>("Timestamp")
.HasColumnType("timestamp with time zone");
b.Property<string>("Url")
.IsRequired()
.HasColumnType("text");
b.Property<Guid>("UserStreamId")
.HasColumnType("uuid");
b.HasKey("Id");
b.HasIndex("UserStreamId");
b.ToTable("Recordings");
});
modelBuilder.Entity("NostrStreamer.Database.Payment", b =>
{
b.HasOne("NostrStreamer.Database.User", "User")
.WithMany("Payments")
.HasForeignKey("PubKey")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
b.Navigation("User");
});
modelBuilder.Entity("NostrStreamer.Database.UserStream", b =>
{
b.HasOne("NostrStreamer.Database.IngestEndpoint", "Endpoint")
.WithMany()
.HasForeignKey("EndpointId")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
b.HasOne("NostrStreamer.Database.User", "User")
.WithMany("Streams")
.HasForeignKey("PubKey")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
b.Navigation("Endpoint");
b.Navigation("User");
});
modelBuilder.Entity("NostrStreamer.Database.UserStreamGuest", b =>
{
b.HasOne("NostrStreamer.Database.UserStream", "Stream")
.WithMany("Guests")
.HasForeignKey("StreamId")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
b.Navigation("Stream");
});
modelBuilder.Entity("NostrStreamer.Database.UserStreamRecording", b =>
{
b.HasOne("NostrStreamer.Database.UserStream", "Stream")
.WithMany("Recordings")
.HasForeignKey("UserStreamId")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
b.Navigation("Stream");
});
modelBuilder.Entity("NostrStreamer.Database.User", b =>
{
b.Navigation("Payments");
b.Navigation("Streams");
});
modelBuilder.Entity("NostrStreamer.Database.UserStream", b =>
{
b.Navigation("Guests");
b.Navigation("Recordings");
});
#pragma warning restore 612, 618
}
}
}

View File

@ -0,0 +1,40 @@
using Microsoft.EntityFrameworkCore.Migrations;
#nullable disable
namespace NostrStreamer.Migrations
{
/// <inheritdoc />
public partial class ForwardClientDetails : Migration
{
/// <inheritdoc />
protected override void Up(MigrationBuilder migrationBuilder)
{
migrationBuilder.AddColumn<string>(
name: "EdgeIp",
table: "Streams",
type: "text",
nullable: false,
defaultValue: "");
migrationBuilder.AddColumn<string>(
name: "ForwardClientId",
table: "Streams",
type: "text",
nullable: false,
defaultValue: "");
}
/// <inheritdoc />
protected override void Down(MigrationBuilder migrationBuilder)
{
migrationBuilder.DropColumn(
name: "EdgeIp",
table: "Streams");
migrationBuilder.DropColumn(
name: "ForwardClientId",
table: "Streams");
}
}
}

View File

@ -135,6 +135,10 @@ namespace NostrStreamer.Migrations
.ValueGeneratedOnAdd() .ValueGeneratedOnAdd()
.HasColumnType("uuid"); .HasColumnType("uuid");
b.Property<string>("EdgeIp")
.IsRequired()
.HasColumnType("text");
b.Property<Guid>("EndpointId") b.Property<Guid>("EndpointId")
.HasColumnType("uuid"); .HasColumnType("uuid");
@ -145,6 +149,10 @@ namespace NostrStreamer.Migrations
.IsRequired() .IsRequired()
.HasColumnType("text"); .HasColumnType("text");
b.Property<string>("ForwardClientId")
.IsRequired()
.HasColumnType("text");
b.Property<string>("PubKey") b.Property<string>("PubKey")
.IsRequired() .IsRequired()
.HasColumnType("text"); .HasColumnType("text");

View File

@ -27,9 +27,10 @@ public class BackgroundStreamManager : BackgroundService
var db = scope.ServiceProvider.GetRequiredService<StreamerContext>(); var db = scope.ServiceProvider.GetRequiredService<StreamerContext>();
var srs = scope.ServiceProvider.GetRequiredService<SrsApi>(); var srs = scope.ServiceProvider.GetRequiredService<SrsApi>();
var recentlyEnded = DateTime.UtcNow.Subtract(TimeSpan.FromMinutes(5));
var liveStreams = await db.Streams var liveStreams = await db.Streams
.AsNoTracking() .AsNoTracking()
.Where(a => a.State == UserStreamState.Live) .Where(a => a.State == UserStreamState.Live || a.Ends > recentlyEnded)
.Select(a => a.Id) .Select(a => a.Id)
.ToListAsync(cancellationToken: stoppingToken); .ToListAsync(cancellationToken: stoppingToken);

View File

@ -10,6 +10,14 @@ public class SrsApi
{ {
_client = client; _client = client;
_client.BaseAddress = config.SrsApiHost; _client.BaseAddress = config.SrsApiHost;
_client.Timeout = TimeSpan.FromSeconds(5);
}
public SrsApi(HttpClient client, Uri baseAddress)
{
_client = client;
_client.BaseAddress = baseAddress;
_client.Timeout = TimeSpan.FromSeconds(5);
} }
public async Task<List<Stream>> ListStreams() public async Task<List<Stream>> ListStreams()

View File

@ -47,7 +47,7 @@ public class StreamEventBuilder
{ {
var viewers = _viewCounter.Current(stream.Id); var viewers = _viewCounter.Current(stream.Id);
var starts = new DateTimeOffset(stream.Starts).ToUnixTimeSeconds(); var starts = new DateTimeOffset(stream.Starts).ToUnixTimeSeconds();
tags.Add(new("streaming", new Uri(_config.DataHost, $"{stream.Id}.m3u8").ToString())); tags.Add(new("streaming", new Uri(_config.DataHost, $"stream/{stream.Id}.m3u8").ToString()));
tags.Add(new("starts", starts.ToString())); tags.Add(new("starts", starts.ToString()));
tags.Add(new("current_participants", viewers.ToString())); tags.Add(new("current_participants", viewers.ToString()));

View File

@ -73,4 +73,10 @@ public interface IStreamManager
/// <param name="segment"></param> /// <param name="segment"></param>
/// <returns></returns> /// <returns></returns>
Task OnDvr(Uri segment); Task OnDvr(Uri segment);
/// <summary>
/// Republish stream event
/// </summary>
/// <returns></returns>
public Task UpdateEvent();
} }

View File

@ -12,16 +12,14 @@ public class NostrStreamManager : IStreamManager
private readonly ILogger<NostrStreamManager> _logger; private readonly ILogger<NostrStreamManager> _logger;
private readonly StreamManagerContext _context; private readonly StreamManagerContext _context;
private readonly StreamEventBuilder _eventBuilder; private readonly StreamEventBuilder _eventBuilder;
private readonly SrsApi _srsApi;
private readonly IDvrStore _dvrStore; private readonly IDvrStore _dvrStore;
public NostrStreamManager(ILogger<NostrStreamManager> logger, StreamManagerContext context, public NostrStreamManager(ILogger<NostrStreamManager> logger, StreamManagerContext context,
StreamEventBuilder eventBuilder, SrsApi srsApi, IDvrStore dvrStore) StreamEventBuilder eventBuilder, IDvrStore dvrStore)
{ {
_logger = logger; _logger = logger;
_context = context; _context = context;
_eventBuilder = eventBuilder; _eventBuilder = eventBuilder;
_srsApi = srsApi;
_dvrStore = dvrStore; _dvrStore = dvrStore;
} }
@ -87,7 +85,7 @@ public class NostrStreamManager : IStreamManager
if (_context.User.Balance <= 0) if (_context.User.Balance <= 0)
{ {
_logger.LogInformation("Kicking stream due to low balance"); _logger.LogInformation("Kicking stream due to low balance");
await _srsApi.KickClient(_context.StreamInfo.ClientId); await _context.EdgeApi.KickClient(_context.UserStream.ForwardClientId);
} }
} }
@ -152,6 +150,11 @@ public class NostrStreamManager : IStreamManager
await _context.Db.SaveChangesAsync(); await _context.Db.SaveChangesAsync();
} }
public async Task UpdateEvent()
{
await UpdateStreamState(_context.UserStream.State);
}
public async Task UpdateViewers() public async Task UpdateViewers()
{ {
if (_context.UserStream.State is not UserStreamState.Live) return; if (_context.UserStream.State is not UserStreamState.Live) return;

View File

@ -10,7 +10,5 @@ public class StreamInfo
public string ClientId { get; init; } = null!; public string ClientId { get; init; } = null!;
public string StreamId { get; init; } = null!;
public string EdgeIp { get; init; } = null!; public string EdgeIp { get; init; } = null!;
} }

View File

@ -8,4 +8,5 @@ public class StreamManagerContext
public UserStream UserStream { get; init; } = null!; public UserStream UserStream { get; init; } = null!;
public User User => UserStream.User; public User User => UserStream.User;
public StreamInfo? StreamInfo { get; init; } public StreamInfo? StreamInfo { get; init; }
public SrsApi EdgeApi { get; init; } = null!;
} }

View File

@ -11,72 +11,20 @@ public class StreamManagerFactory
private readonly StreamerContext _db; private readonly StreamerContext _db;
private readonly ILoggerFactory _loggerFactory; private readonly ILoggerFactory _loggerFactory;
private readonly StreamEventBuilder _eventBuilder; private readonly StreamEventBuilder _eventBuilder;
private readonly SrsApi _srsApi; private readonly IServiceProvider _serviceProvider;
private readonly IDvrStore _dvrStore; private readonly IDvrStore _dvrStore;
public StreamManagerFactory(StreamerContext db, ILoggerFactory loggerFactory, StreamEventBuilder eventBuilder, public StreamManagerFactory(StreamerContext db, ILoggerFactory loggerFactory, StreamEventBuilder eventBuilder,
SrsApi srsApi, IDvrStore dvrStore) IServiceProvider serviceProvider, IDvrStore dvrStore)
{ {
_db = db; _db = db;
_loggerFactory = loggerFactory; _loggerFactory = loggerFactory;
_eventBuilder = eventBuilder; _eventBuilder = eventBuilder;
_srsApi = srsApi; _serviceProvider = serviceProvider;
_dvrStore = dvrStore; _dvrStore = dvrStore;
} }
public async Task<IStreamManager> ForStream(Guid id) public async Task<IStreamManager> CreateStream(StreamInfo info)
{
var currentStream = await _db.Streams
.AsNoTracking()
.Include(a => a.User)
.Include(a => a.Endpoint)
.Include(a => a.Recordings)
.FirstOrDefaultAsync(a => a.Id == id);
if (currentStream == default) throw new Exception("No live stream");
var ctx = new StreamManagerContext
{
Db = _db,
UserStream = currentStream
};
return new NostrStreamManager(_loggerFactory.CreateLogger<NostrStreamManager>(), ctx, _eventBuilder, _srsApi, _dvrStore);
}
public async Task<IStreamManager> ForCurrentStream(string pubkey)
{
var currentStream = await _db.Streams
.AsNoTracking()
.Include(a => a.User)
.Include(a => a.Endpoint)
.Include(a => a.Recordings)
.FirstOrDefaultAsync(a => a.PubKey.Equals(pubkey) && a.State == UserStreamState.Live);
if (currentStream == default) throw new Exception("No live stream");
var ctx = new StreamManagerContext
{
Db = _db,
UserStream = currentStream
};
return new NostrStreamManager(_loggerFactory.CreateLogger<NostrStreamManager>(), ctx, _eventBuilder, _srsApi, _dvrStore);
}
public async Task<IStreamManager> ForStream(StreamInfo info)
{
var stream = await _db.Streams
.AsNoTracking()
.Include(a => a.User)
.Include(a => a.Endpoint)
.Include(a => a.Recordings)
.FirstOrDefaultAsync(a =>
a.StreamId.Equals(info.StreamId) &&
a.User.StreamKey.Equals(info.StreamKey) &&
a.Endpoint.App.Equals(info.App));
if (stream == default)
{ {
var user = await _db.Users var user = await _db.Users
.AsNoTracking() .AsNoTracking()
@ -90,41 +38,119 @@ public class StreamManagerFactory
if (ep == default) throw new Exception("No endpoint found"); if (ep == default) throw new Exception("No endpoint found");
// create new stream entry for source only if (await _db.Streams.CountAsync(a => a.State == UserStreamState.Live && a.PubKey == user.PubKey) != 0)
if (info.Variant == "source")
{ {
stream = new() throw new Exception("Cannot start a new stream when already live");
}
if (user.Balance <= 0)
{
throw new Exception("Cannot start stream with empty balance");
}
var stream = new UserStream
{ {
EndpointId = ep.Id, EndpointId = ep.Id,
PubKey = user.PubKey, PubKey = user.PubKey,
StreamId = info.StreamId, StreamId = "",
State = UserStreamState.Planned State = UserStreamState.Live,
EdgeIp = info.EdgeIp,
ForwardClientId = info.ClientId
}; };
var ev = _eventBuilder.CreateStreamEvent(user, stream); var ev = _eventBuilder.CreateStreamEvent(user, stream);
stream.Event = JsonConvert.SerializeObject(ev, NostrSerializer.Settings); stream.Event = JsonConvert.SerializeObject(ev, NostrSerializer.Settings);
_db.Streams.Add(stream); _db.Streams.Add(stream);
await _db.SaveChangesAsync(); await _db.SaveChangesAsync();
var ctx = new StreamManagerContext
{
Db = _db,
UserStream = new()
{
Id = stream.Id,
PubKey = stream.PubKey,
StreamId = stream.StreamId,
State = stream.State,
EdgeIp = stream.EdgeIp,
ForwardClientId = stream.ForwardClientId,
Endpoint = ep,
User = user
},
EdgeApi = new SrsApi(_serviceProvider.GetRequiredService<HttpClient>(), new Uri($"http://{stream.EdgeIp}:1985"))
};
return new NostrStreamManager(_loggerFactory.CreateLogger<NostrStreamManager>(), ctx, _eventBuilder, _dvrStore);
} }
// replace again with new values public async Task<IStreamManager> ForStream(Guid id)
stream = new()
{ {
Id = stream?.Id ?? Guid.NewGuid(), var stream = await _db.Streams
User = user, .AsNoTracking()
Endpoint = ep, .Include(a => a.User)
StreamId = info.StreamId, .Include(a => a.Endpoint)
State = UserStreamState.Planned, .Include(a => a.Recordings)
.FirstOrDefaultAsync(a => a.Id == id);
if (stream == default) throw new Exception("No live stream");
var ctx = new StreamManagerContext
{
Db = _db,
UserStream = stream,
EdgeApi = new SrsApi(_serviceProvider.GetRequiredService<HttpClient>(), new Uri($"http://{stream.EdgeIp}:1985"))
}; };
return new NostrStreamManager(_loggerFactory.CreateLogger<NostrStreamManager>(), ctx, _eventBuilder, _dvrStore);
}
public async Task<IStreamManager> ForCurrentStream(string pubkey)
{
var stream = await _db.Streams
.AsNoTracking()
.Include(a => a.User)
.Include(a => a.Endpoint)
.Include(a => a.Recordings)
.FirstOrDefaultAsync(a => a.PubKey.Equals(pubkey) && a.State == UserStreamState.Live);
if (stream == default) throw new Exception("No live stream");
var ctx = new StreamManagerContext
{
Db = _db,
UserStream = stream,
EdgeApi = new SrsApi(_serviceProvider.GetRequiredService<HttpClient>(), new Uri($"http://{stream.EdgeIp}:1985"))
};
return new NostrStreamManager(_loggerFactory.CreateLogger<NostrStreamManager>(), ctx, _eventBuilder, _dvrStore);
}
public async Task<IStreamManager> ForStream(StreamInfo info)
{
var stream = await _db.Streams
.AsNoTracking()
.Include(a => a.User)
.Include(a => a.Endpoint)
.Include(a => a.Recordings)
.OrderByDescending(a => a.Starts)
.FirstOrDefaultAsync(a =>
a.User.StreamKey.Equals(info.StreamKey) &&
a.Endpoint.App.Equals(info.App) &&
a.State == UserStreamState.Live);
if (stream == default)
{
throw new Exception("No stream found");
} }
var ctx = new StreamManagerContext var ctx = new StreamManagerContext
{ {
Db = _db, Db = _db,
UserStream = stream, UserStream = stream,
StreamInfo = info StreamInfo = info,
EdgeApi = new SrsApi(_serviceProvider.GetRequiredService<HttpClient>(), new Uri($"http://{stream.EdgeIp}:1985"))
}; };
return new NostrStreamManager(_loggerFactory.CreateLogger<NostrStreamManager>(), ctx, _eventBuilder, _srsApi, _dvrStore); return new NostrStreamManager(_loggerFactory.CreateLogger<NostrStreamManager>(), ctx, _eventBuilder, _dvrStore);
} }
} }

View File

@ -1,3 +1,4 @@
using System.Diagnostics;
using FFMpegCore; using FFMpegCore;
using NostrStreamer.Database; using NostrStreamer.Database;
@ -24,6 +25,7 @@ public class ThumbnailService
var path = MapPath(stream.Id); var path = MapPath(stream.Id);
try try
{ {
var sw = Stopwatch.StartNew();
var cmd = FFMpegArguments var cmd = FFMpegArguments
.FromUrlInput(new Uri(_config.RtmpHost, $"{stream.Endpoint.App}/source/{stream.User.StreamKey}?vhost=hls.zap.stream")) .FromUrlInput(new Uri(_config.RtmpHost, $"{stream.Endpoint.App}/source/{stream.User.StreamKey}?vhost=hls.zap.stream"))
.OutputToFile(path, true, o => { o.ForceFormat("image2").WithCustomArgument("-vframes 1"); }) .OutputToFile(path, true, o => { o.ForceFormat("image2").WithCustomArgument("-vframes 1"); })
@ -31,10 +33,12 @@ public class ThumbnailService
_logger.LogInformation("Running command {cmd}", cmd.Arguments); _logger.LogInformation("Running command {cmd}", cmd.Arguments);
await cmd.ProcessAsynchronously(); await cmd.ProcessAsynchronously();
sw.Stop();
_logger.LogInformation("Generated {id} thumb in {n:#,##0}ms", stream.Id, sw.Elapsed.TotalMilliseconds);
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogWarning("Failed to generate thumbnail {msg}", ex.Message); _logger.LogWarning("Failed to generate {id} thumbnail {msg}", stream.Id, ex.Message);
} }
} }

View File

@ -3,7 +3,8 @@
"LogLevel": { "LogLevel": {
"Default": "Information", "Default": "Information",
"Microsoft.AspNetCore": "Warning", "Microsoft.AspNetCore": "Warning",
"Microsoft.EntityFrameworkCore": "Warning" "Microsoft.EntityFrameworkCore": "Warning",
"System.Net.Http.HttpClient": "Error"
} }
}, },
"AllowedHosts": "*", "AllowedHosts": "*",

View File

@ -11,12 +11,12 @@ services:
- "9003:8080" - "9003:8080"
- "9004:8000" - "9004:8000"
srs-edge: srs-edge:
image: ossrs/srs:4 image: ossrs/srs:5
volumes: volumes:
- "./docker/srs-edge.conf:/usr/local/srs/conf/srs.conf" - "./docker/srs-edge.conf:/usr/local/srs/conf/srs.conf"
ports: ports:
- "9005:1935" - "9005:1935"
- "9006:1985" - "1985:1985"
- "9007:8080" - "9007:8080"
- "9008:8000" - "9008:8000"
nostr: nostr:

View File

@ -39,7 +39,7 @@ vhost hls.zap.stream {
} }
dvr { dvr {
enabled on; enabled off;
dvr_path ./objs/nginx/html/[app]/[stream].[timestamp].mp4; dvr_path ./objs/nginx/html/[app]/[stream].[timestamp].mp4;
dvr_plan segment; dvr_plan segment;
dvr_duration 30; dvr_duration 30;