This commit is contained in:
2023-07-29 17:43:19 +01:00
parent 4f690b114d
commit c5c9c54518
24 changed files with 985 additions and 12 deletions

View File

@ -36,6 +36,8 @@ public class Config
public string[] Relays { get; init; } = Array.Empty<string>();
public LndConfig Lnd { get; init; } = null!;
public S3BlobConfig DvrStore { get; init; } = null!;
}
public class LndConfig
@ -46,3 +48,14 @@ public class LndConfig
public string MacaroonPath { get; init; } = null!;
}
public sealed class S3BlobConfig
{
public string Name { get; init; } = null!;
public string AccessKey { get; init; } = null!;
public string SecretKey { get; init; } = null!;
public Uri ServiceUrl { get; init; } = null!;
public string? Region { get; init; }
public string BucketName { get; init; } = "zap-stream-dvr";
public bool DisablePayloadSigning { get; init; }
}

View File

@ -124,7 +124,7 @@ public class PlaylistController : Controller
return;
}
Response.ContentType = "application/x-mpegurl";
Response.ContentType = "application/vnd.apple.mpegurl";
await using var sw = new StreamWriter(Response.Body);
var streams = await _srsApi.ListStreams();
@ -176,6 +176,39 @@ public class PlaylistController : Controller
}
}
[HttpGet("recording/{id:guid}.m3u8")]
public async Task RecordingPlaylist([FromRoute]Guid id)
{
try
{
var streamManager = await _streamManagerFactory.ForStream(id);
var userStream = streamManager.GetStream();
// https://developer.apple.com/documentation/http-live-streaming/video-on-demand-playlist-construction
Response.ContentType = "application/vnd.apple.mpegurl";
await using var sw = new StreamWriter(Response.Body);
await sw.WriteLineAsync("#EXTM3U");
await sw.WriteLineAsync("#EXT-X-PLAYLIST-TYPE:VOD");
await sw.WriteLineAsync("#EXT-X-TARGETDURATION:30");
await sw.WriteLineAsync("#EXT-X-VERSION:4");
await sw.WriteLineAsync("#EXT-X-MEDIA-SEQUENCE:0");
await sw.WriteLineAsync("#EXT-X-INDEPENDENT-SEGMENTS");
foreach (var seg in userStream.Recordings.OrderBy(a => a.Timestamp))
{
await sw.WriteLineAsync($"#EXTINF:{seg.Duration:0.0####},");
await sw.WriteLineAsync($"#EXT-X-PROGRAM-DATE-TIME:{seg.Timestamp:yyyy-MM-ddTHH:mm:ss.fffzzz}");
await sw.WriteLineAsync(seg.Url);
}
await sw.WriteLineAsync("#EXT-X-ENDLIST");
}
catch
{
Response.StatusCode = 404;
}
}
private async Task<string?> GetHlsCtx(UserStream stream)
{
var path = $"/{stream.Endpoint.App}/source/{stream.User.StreamKey}.m3u8";

View File

@ -9,11 +9,13 @@ public class SrsController : Controller
{
private readonly ILogger<SrsController> _logger;
private readonly StreamManagerFactory _streamManagerFactory;
private readonly Config _config;
public SrsController(ILogger<SrsController> logger, StreamManagerFactory streamManager)
public SrsController(ILogger<SrsController> logger, StreamManagerFactory streamManager, Config config)
{
_logger = logger;
_streamManagerFactory = streamManager;
_config = config;
}
[HttpPost]
@ -78,6 +80,12 @@ public class SrsController : Controller
await streamManager.ConsumeQuota(req.Duration.Value);
return new();
}
if (req.Action == "on_dvr" && !string.IsNullOrEmpty(req.File))
{
await streamManager.OnDvr(new Uri(_config.SrsHttpHost, $"{req.App}/{Path.GetFileName(req.File)}"));
return new();
}
}
else
{
@ -139,4 +147,7 @@ public class SrsHook
[JsonProperty("duration")]
public double? Duration { get; init; }
[JsonProperty("file")]
public string? File { get; init; }
}

View File

@ -0,0 +1,24 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Builders;
namespace NostrStreamer.Database.Configuration;
public class UserStreamRecordingConfiguration : IEntityTypeConfiguration<UserStreamRecording>
{
public void Configure(EntityTypeBuilder<UserStreamRecording> builder)
{
builder.HasKey(a => a.Id);
builder.Property(a => a.Url)
.IsRequired();
builder.Property(a => a.Timestamp)
.IsRequired();
builder.Property(a => a.Duration)
.IsRequired();
builder.HasOne(a => a.Stream)
.WithMany(a => a.Recordings)
.HasForeignKey(a => a.UserStreamId);
}
}

View File

@ -27,4 +27,6 @@ public class StreamerContext : DbContext
public DbSet<UserStreamGuest> Guests => Set<UserStreamGuest>();
public DbSet<IngestEndpoint> Endpoints => Set<IngestEndpoint>();
public DbSet<UserStreamRecording> Recordings => Set<UserStreamRecording>();
}

View File

@ -29,6 +29,7 @@ public class UserStream
public IngestEndpoint Endpoint { get; init; } = null!;
public List<UserStreamGuest> Guests { get; init; } = new();
public List<UserStreamRecording> Recordings { get; init; } = new();
}
public enum UserStreamState

View File

@ -0,0 +1,15 @@
namespace NostrStreamer.Database;
public class UserStreamRecording
{
public Guid Id { get; init; } = Guid.NewGuid();
public Guid UserStreamId { get; init; }
public UserStream Stream { get; init; } = null!;
public string Url { get; init; } = null!;
public DateTime Timestamp { get; init; } = DateTime.UtcNow;
public double Duration { get; init; }
}

View File

@ -1,3 +1,6 @@
using Amazon;
using Amazon.Runtime;
using Amazon.S3;
using Newtonsoft.Json;
using Nostr.Client.Json;
using Nostr.Client.Keys;
@ -29,6 +32,18 @@ public static class Extensions
.Where(a => a.StartsWith("variant"))
.Select(Variant.FromString).ToList();
}
public static AmazonS3Client CreateClient(this S3BlobConfig c)
{
return new AmazonS3Client(new BasicAWSCredentials(c.AccessKey, c.SecretKey),
new AmazonS3Config
{
RegionEndpoint = !string.IsNullOrEmpty(c.Region) ? RegionEndpoint.GetBySystemName(c.Region) : null,
ServiceURL = c.ServiceUrl.ToString(),
UseHttp = c.ServiceUrl.Scheme == "http",
ForcePathStyle = true
});
}
}
public class Variant

View File

@ -0,0 +1,301 @@
// <auto-generated />
using System;
using System.Collections.Generic;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Migrations;
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
using NostrStreamer.Database;
using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata;
#nullable disable
namespace NostrStreamer.Migrations
{
[DbContext(typeof(StreamerContext))]
[Migration("20230727224032_DVR")]
partial class DVR
{
/// <inheritdoc />
protected override void BuildTargetModel(ModelBuilder modelBuilder)
{
#pragma warning disable 612, 618
modelBuilder
.HasAnnotation("ProductVersion", "7.0.8")
.HasAnnotation("Relational:MaxIdentifierLength", 63);
NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder);
modelBuilder.Entity("NostrStreamer.Database.IngestEndpoint", b =>
{
b.Property<Guid>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("uuid");
b.Property<string>("App")
.IsRequired()
.HasColumnType("text");
b.Property<List<string>>("Capabilities")
.IsRequired()
.HasColumnType("text[]");
b.Property<int>("Cost")
.HasColumnType("integer");
b.Property<string>("Forward")
.IsRequired()
.HasColumnType("text");
b.Property<string>("Name")
.IsRequired()
.HasColumnType("text");
b.HasKey("Id");
b.HasIndex("App")
.IsUnique();
b.ToTable("Endpoints");
});
modelBuilder.Entity("NostrStreamer.Database.Payment", b =>
{
b.Property<string>("PaymentHash")
.HasColumnType("text");
b.Property<decimal>("Amount")
.HasColumnType("numeric(20,0)");
b.Property<DateTime>("Created")
.HasColumnType("timestamp with time zone");
b.Property<string>("Invoice")
.IsRequired()
.HasColumnType("text");
b.Property<bool>("IsPaid")
.HasColumnType("boolean");
b.Property<string>("Nostr")
.HasColumnType("text");
b.Property<string>("PubKey")
.IsRequired()
.HasColumnType("text");
b.Property<int>("Type")
.HasColumnType("integer");
b.HasKey("PaymentHash");
b.HasIndex("PubKey");
b.ToTable("Payments");
});
modelBuilder.Entity("NostrStreamer.Database.User", b =>
{
b.Property<string>("PubKey")
.HasColumnType("text");
b.Property<long>("Balance")
.HasColumnType("bigint");
b.Property<string>("ContentWarning")
.HasColumnType("text");
b.Property<string>("Image")
.HasColumnType("text");
b.Property<string>("StreamKey")
.IsRequired()
.HasColumnType("text");
b.Property<string>("Summary")
.HasColumnType("text");
b.Property<string>("Tags")
.HasColumnType("text");
b.Property<string>("Title")
.HasColumnType("text");
b.Property<uint>("Version")
.IsConcurrencyToken()
.ValueGeneratedOnAddOrUpdate()
.HasColumnType("xid")
.HasColumnName("xmin");
b.HasKey("PubKey");
b.ToTable("Users");
});
modelBuilder.Entity("NostrStreamer.Database.UserStream", b =>
{
b.Property<Guid>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("uuid");
b.Property<string>("ClientId")
.IsRequired()
.HasColumnType("text");
b.Property<Guid>("EndpointId")
.HasColumnType("uuid");
b.Property<DateTime?>("Ends")
.HasColumnType("timestamp with time zone");
b.Property<string>("Event")
.IsRequired()
.HasColumnType("text");
b.Property<string>("PubKey")
.IsRequired()
.HasColumnType("text");
b.Property<string>("Recording")
.HasColumnType("text");
b.Property<DateTime>("Starts")
.HasColumnType("timestamp with time zone");
b.Property<int>("State")
.HasColumnType("integer");
b.HasKey("Id");
b.HasIndex("EndpointId");
b.HasIndex("PubKey");
b.ToTable("Streams");
});
modelBuilder.Entity("NostrStreamer.Database.UserStreamGuest", b =>
{
b.Property<Guid>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("uuid");
b.Property<string>("PubKey")
.IsRequired()
.HasColumnType("text");
b.Property<string>("Relay")
.HasColumnType("text");
b.Property<string>("Role")
.HasColumnType("text");
b.Property<string>("Sig")
.HasColumnType("text");
b.Property<Guid>("StreamId")
.HasColumnType("uuid");
b.Property<decimal>("ZapSplit")
.HasColumnType("numeric");
b.HasKey("Id");
b.HasIndex("StreamId", "PubKey")
.IsUnique();
b.ToTable("Guests");
});
modelBuilder.Entity("NostrStreamer.Database.UserStreamRecording", b =>
{
b.Property<Guid>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("uuid");
b.Property<DateTime>("Timestamp")
.HasColumnType("timestamp with time zone");
b.Property<string>("Url")
.IsRequired()
.HasColumnType("text");
b.Property<Guid>("UserStreamId")
.HasColumnType("uuid");
b.HasKey("Id");
b.HasIndex("UserStreamId");
b.ToTable("Recordings");
});
modelBuilder.Entity("NostrStreamer.Database.Payment", b =>
{
b.HasOne("NostrStreamer.Database.User", "User")
.WithMany("Payments")
.HasForeignKey("PubKey")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
b.Navigation("User");
});
modelBuilder.Entity("NostrStreamer.Database.UserStream", b =>
{
b.HasOne("NostrStreamer.Database.IngestEndpoint", "Endpoint")
.WithMany()
.HasForeignKey("EndpointId")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
b.HasOne("NostrStreamer.Database.User", "User")
.WithMany("Streams")
.HasForeignKey("PubKey")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
b.Navigation("Endpoint");
b.Navigation("User");
});
modelBuilder.Entity("NostrStreamer.Database.UserStreamGuest", b =>
{
b.HasOne("NostrStreamer.Database.UserStream", "Stream")
.WithMany("Guests")
.HasForeignKey("StreamId")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
b.Navigation("Stream");
});
modelBuilder.Entity("NostrStreamer.Database.UserStreamRecording", b =>
{
b.HasOne("NostrStreamer.Database.UserStream", "Stream")
.WithMany("Recordings")
.HasForeignKey("UserStreamId")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
b.Navigation("Stream");
});
modelBuilder.Entity("NostrStreamer.Database.User", b =>
{
b.Navigation("Payments");
b.Navigation("Streams");
});
modelBuilder.Entity("NostrStreamer.Database.UserStream", b =>
{
b.Navigation("Guests");
b.Navigation("Recordings");
});
#pragma warning restore 612, 618
}
}
}

View File

@ -0,0 +1,47 @@
using System;
using Microsoft.EntityFrameworkCore.Migrations;
#nullable disable
namespace NostrStreamer.Migrations
{
/// <inheritdoc />
public partial class DVR : Migration
{
/// <inheritdoc />
protected override void Up(MigrationBuilder migrationBuilder)
{
migrationBuilder.CreateTable(
name: "Recordings",
columns: table => new
{
Id = table.Column<Guid>(type: "uuid", nullable: false),
UserStreamId = table.Column<Guid>(type: "uuid", nullable: false),
Url = table.Column<string>(type: "text", nullable: false),
Timestamp = table.Column<DateTime>(type: "timestamp with time zone", nullable: false)
},
constraints: table =>
{
table.PrimaryKey("PK_Recordings", x => x.Id);
table.ForeignKey(
name: "FK_Recordings_Streams_UserStreamId",
column: x => x.UserStreamId,
principalTable: "Streams",
principalColumn: "Id",
onDelete: ReferentialAction.Cascade);
});
migrationBuilder.CreateIndex(
name: "IX_Recordings_UserStreamId",
table: "Recordings",
column: "UserStreamId");
}
/// <inheritdoc />
protected override void Down(MigrationBuilder migrationBuilder)
{
migrationBuilder.DropTable(
name: "Recordings");
}
}
}

View File

@ -0,0 +1,304 @@
// <auto-generated />
using System;
using System.Collections.Generic;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Migrations;
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
using NostrStreamer.Database;
using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata;
#nullable disable
namespace NostrStreamer.Migrations
{
[DbContext(typeof(StreamerContext))]
[Migration("20230728103832_RecordingDuration")]
partial class RecordingDuration
{
/// <inheritdoc />
protected override void BuildTargetModel(ModelBuilder modelBuilder)
{
#pragma warning disable 612, 618
modelBuilder
.HasAnnotation("ProductVersion", "7.0.8")
.HasAnnotation("Relational:MaxIdentifierLength", 63);
NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder);
modelBuilder.Entity("NostrStreamer.Database.IngestEndpoint", b =>
{
b.Property<Guid>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("uuid");
b.Property<string>("App")
.IsRequired()
.HasColumnType("text");
b.Property<List<string>>("Capabilities")
.IsRequired()
.HasColumnType("text[]");
b.Property<int>("Cost")
.HasColumnType("integer");
b.Property<string>("Forward")
.IsRequired()
.HasColumnType("text");
b.Property<string>("Name")
.IsRequired()
.HasColumnType("text");
b.HasKey("Id");
b.HasIndex("App")
.IsUnique();
b.ToTable("Endpoints");
});
modelBuilder.Entity("NostrStreamer.Database.Payment", b =>
{
b.Property<string>("PaymentHash")
.HasColumnType("text");
b.Property<decimal>("Amount")
.HasColumnType("numeric(20,0)");
b.Property<DateTime>("Created")
.HasColumnType("timestamp with time zone");
b.Property<string>("Invoice")
.IsRequired()
.HasColumnType("text");
b.Property<bool>("IsPaid")
.HasColumnType("boolean");
b.Property<string>("Nostr")
.HasColumnType("text");
b.Property<string>("PubKey")
.IsRequired()
.HasColumnType("text");
b.Property<int>("Type")
.HasColumnType("integer");
b.HasKey("PaymentHash");
b.HasIndex("PubKey");
b.ToTable("Payments");
});
modelBuilder.Entity("NostrStreamer.Database.User", b =>
{
b.Property<string>("PubKey")
.HasColumnType("text");
b.Property<long>("Balance")
.HasColumnType("bigint");
b.Property<string>("ContentWarning")
.HasColumnType("text");
b.Property<string>("Image")
.HasColumnType("text");
b.Property<string>("StreamKey")
.IsRequired()
.HasColumnType("text");
b.Property<string>("Summary")
.HasColumnType("text");
b.Property<string>("Tags")
.HasColumnType("text");
b.Property<string>("Title")
.HasColumnType("text");
b.Property<uint>("Version")
.IsConcurrencyToken()
.ValueGeneratedOnAddOrUpdate()
.HasColumnType("xid")
.HasColumnName("xmin");
b.HasKey("PubKey");
b.ToTable("Users");
});
modelBuilder.Entity("NostrStreamer.Database.UserStream", b =>
{
b.Property<Guid>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("uuid");
b.Property<string>("ClientId")
.IsRequired()
.HasColumnType("text");
b.Property<Guid>("EndpointId")
.HasColumnType("uuid");
b.Property<DateTime?>("Ends")
.HasColumnType("timestamp with time zone");
b.Property<string>("Event")
.IsRequired()
.HasColumnType("text");
b.Property<string>("PubKey")
.IsRequired()
.HasColumnType("text");
b.Property<string>("Recording")
.HasColumnType("text");
b.Property<DateTime>("Starts")
.HasColumnType("timestamp with time zone");
b.Property<int>("State")
.HasColumnType("integer");
b.HasKey("Id");
b.HasIndex("EndpointId");
b.HasIndex("PubKey");
b.ToTable("Streams");
});
modelBuilder.Entity("NostrStreamer.Database.UserStreamGuest", b =>
{
b.Property<Guid>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("uuid");
b.Property<string>("PubKey")
.IsRequired()
.HasColumnType("text");
b.Property<string>("Relay")
.HasColumnType("text");
b.Property<string>("Role")
.HasColumnType("text");
b.Property<string>("Sig")
.HasColumnType("text");
b.Property<Guid>("StreamId")
.HasColumnType("uuid");
b.Property<decimal>("ZapSplit")
.HasColumnType("numeric");
b.HasKey("Id");
b.HasIndex("StreamId", "PubKey")
.IsUnique();
b.ToTable("Guests");
});
modelBuilder.Entity("NostrStreamer.Database.UserStreamRecording", b =>
{
b.Property<Guid>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("uuid");
b.Property<double>("Duration")
.HasColumnType("double precision");
b.Property<DateTime>("Timestamp")
.HasColumnType("timestamp with time zone");
b.Property<string>("Url")
.IsRequired()
.HasColumnType("text");
b.Property<Guid>("UserStreamId")
.HasColumnType("uuid");
b.HasKey("Id");
b.HasIndex("UserStreamId");
b.ToTable("Recordings");
});
modelBuilder.Entity("NostrStreamer.Database.Payment", b =>
{
b.HasOne("NostrStreamer.Database.User", "User")
.WithMany("Payments")
.HasForeignKey("PubKey")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
b.Navigation("User");
});
modelBuilder.Entity("NostrStreamer.Database.UserStream", b =>
{
b.HasOne("NostrStreamer.Database.IngestEndpoint", "Endpoint")
.WithMany()
.HasForeignKey("EndpointId")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
b.HasOne("NostrStreamer.Database.User", "User")
.WithMany("Streams")
.HasForeignKey("PubKey")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
b.Navigation("Endpoint");
b.Navigation("User");
});
modelBuilder.Entity("NostrStreamer.Database.UserStreamGuest", b =>
{
b.HasOne("NostrStreamer.Database.UserStream", "Stream")
.WithMany("Guests")
.HasForeignKey("StreamId")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
b.Navigation("Stream");
});
modelBuilder.Entity("NostrStreamer.Database.UserStreamRecording", b =>
{
b.HasOne("NostrStreamer.Database.UserStream", "Stream")
.WithMany("Recordings")
.HasForeignKey("UserStreamId")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
b.Navigation("Stream");
});
modelBuilder.Entity("NostrStreamer.Database.User", b =>
{
b.Navigation("Payments");
b.Navigation("Streams");
});
modelBuilder.Entity("NostrStreamer.Database.UserStream", b =>
{
b.Navigation("Guests");
b.Navigation("Recordings");
});
#pragma warning restore 612, 618
}
}
}

View File

@ -0,0 +1,29 @@
using Microsoft.EntityFrameworkCore.Migrations;
#nullable disable
namespace NostrStreamer.Migrations
{
/// <inheritdoc />
public partial class RecordingDuration : Migration
{
/// <inheritdoc />
protected override void Up(MigrationBuilder migrationBuilder)
{
migrationBuilder.AddColumn<double>(
name: "Duration",
table: "Recordings",
type: "double precision",
nullable: false,
defaultValue: 0.0);
}
/// <inheritdoc />
protected override void Down(MigrationBuilder migrationBuilder)
{
migrationBuilder.DropColumn(
name: "Duration",
table: "Recordings");
}
}
}

View File

@ -37,7 +37,7 @@ namespace NostrStreamer.Migrations
.IsRequired()
.HasColumnType("text[]");
b.Property<double>("Cost")
b.Property<int>("Cost")
.HasColumnType("integer");
b.Property<string>("Forward")
@ -204,6 +204,32 @@ namespace NostrStreamer.Migrations
b.ToTable("Guests");
});
modelBuilder.Entity("NostrStreamer.Database.UserStreamRecording", b =>
{
b.Property<Guid>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("uuid");
b.Property<double>("Duration")
.HasColumnType("double precision");
b.Property<DateTime>("Timestamp")
.HasColumnType("timestamp with time zone");
b.Property<string>("Url")
.IsRequired()
.HasColumnType("text");
b.Property<Guid>("UserStreamId")
.HasColumnType("uuid");
b.HasKey("Id");
b.HasIndex("UserStreamId");
b.ToTable("Recordings");
});
modelBuilder.Entity("NostrStreamer.Database.Payment", b =>
{
b.HasOne("NostrStreamer.Database.User", "User")
@ -245,6 +271,17 @@ namespace NostrStreamer.Migrations
b.Navigation("Stream");
});
modelBuilder.Entity("NostrStreamer.Database.UserStreamRecording", b =>
{
b.HasOne("NostrStreamer.Database.UserStream", "Stream")
.WithMany("Recordings")
.HasForeignKey("UserStreamId")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
b.Navigation("Stream");
});
modelBuilder.Entity("NostrStreamer.Database.User", b =>
{
b.Navigation("Payments");
@ -255,6 +292,8 @@ namespace NostrStreamer.Migrations
modelBuilder.Entity("NostrStreamer.Database.UserStream", b =>
{
b.Navigation("Guests");
b.Navigation("Recordings");
});
#pragma warning restore 612, 618
}

View File

@ -29,6 +29,7 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="AWSSDK.S3" Version="3.7.201.3" />
<PackageReference Include="FFMpegCore" Version="5.1.0" />
<PackageReference Include="Google.Protobuf" Version="3.23.3" />
<PackageReference Include="Grpc.Net.Client" Version="2.54.0" />

View File

@ -5,6 +5,7 @@ using Microsoft.EntityFrameworkCore;
using Nostr.Client.Client;
using NostrStreamer.Database;
using NostrStreamer.Services;
using NostrStreamer.Services.Dvr;
using NostrStreamer.Services.StreamManager;
namespace NostrStreamer;
@ -58,6 +59,7 @@ internal static class Program
services.AddTransient<UserService>();
services.AddTransient<ThumbnailService>();
services.AddHostedService<ThumbnailGenerator>();
services.AddTransient<IDvrStore, S3DvrStore>();
// lnd services
services.AddSingleton<LndNode>();

View File

@ -0,0 +1,13 @@
namespace NostrStreamer.Services.Dvr;
public interface IDvrStore
{
/// <summary>
/// Upload a DVR recording to storage and return the URL
/// </summary>
/// <param name="source"></param>
/// <returns></returns>
Task<UploadResult> UploadRecording(Uri source);
}
public record UploadResult(Uri Result, double Duration);

View File

@ -0,0 +1,58 @@
using Amazon.S3;
using Amazon.S3.Model;
using FFMpegCore;
namespace NostrStreamer.Services.Dvr;
public class S3DvrStore : IDvrStore
{
private readonly AmazonS3Client _client;
private readonly S3BlobConfig _config;
private readonly HttpClient _httpClient;
public S3DvrStore(Config config, HttpClient httpClient)
{
_httpClient = httpClient;
_config = config.DvrStore;
_client = config.DvrStore.CreateClient();
}
public async Task<UploadResult> UploadRecording(Uri source)
{
var tmpFile = Path.GetTempFileName();
var recordingId = Guid.NewGuid();
var dvrSeg = await _httpClient.GetStreamAsync(source);
await using var fs = new FileStream(tmpFile, FileMode.Create, FileAccess.ReadWrite);
await dvrSeg.CopyToAsync(fs);
fs.Seek(0, SeekOrigin.Begin);
var probe = await FFProbe.AnalyseAsync(tmpFile);
fs.Seek(0, SeekOrigin.Begin);
var key = $"{recordingId}.mp4";
await _client.PutObjectAsync(new PutObjectRequest
{
BucketName = _config.BucketName,
Key = key,
InputStream = fs,
AutoCloseStream = false,
AutoResetStreamPosition = false,
ContentType = "video/mp4",
DisablePayloadSigning = _config.DisablePayloadSigning
});
var url = _client.GetPreSignedURL(new()
{
BucketName = _config.BucketName,
Key = key,
Expires = new DateTime(3000, 1, 1)
});
var ret = new UriBuilder(url)
{
Scheme = _config.ServiceUrl.Scheme
};
return new(ret.Uri, probe.Duration.TotalSeconds);
}
}

View File

@ -37,7 +37,6 @@ public class StreamEventBuilder
new("d", stream.Id.ToString()),
new("title", user.Title ?? ""),
new("summary", user.Summary ?? ""),
new("streaming", new Uri(_config.DataHost, $"{user.PubKey}.m3u8").ToString()),
new("image", string.IsNullOrEmpty(user.Image) ? new Uri(_config.DataHost, $"{stream.Id}.jpg").ToString() : user.Image),
new("status", status),
new("p", user.PubKey, "", "host"),
@ -48,6 +47,7 @@ public class StreamEventBuilder
{
var viewers = _viewCounter.Current(stream.Id);
var starts = new DateTimeOffset(stream.Starts).ToUnixTimeSeconds();
tags.Add(new("streaming", new Uri(_config.DataHost, $"{user.PubKey}.m3u8").ToString()));
tags.Add(new("starts", starts.ToString()));
tags.Add(new("current_participants", viewers.ToString()));
@ -56,6 +56,10 @@ public class StreamEventBuilder
tags.Add(new("content-warning", user.ContentWarning));
}
}
else if (status == "ended")
{
tags.Add(new("recording", new Uri(_config.DataHost, $"recording/{stream.Id}.m3u8").ToString()));
}
foreach (var tag in !string.IsNullOrEmpty(user.Tags) ?
user.Tags.Split(",", StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries) : Array.Empty<string>())

View File

@ -66,4 +66,11 @@ public interface IStreamManager
/// <param name="pubkey"></param>
/// <returns></returns>
Task RemoveGuest(string pubkey);
/// <summary>
/// When a new DVR segment is available
/// </summary>
/// <param name="segment"></param>
/// <returns></returns>
Task OnDvr(Uri segment);
}

View File

@ -1,7 +1,9 @@
using System.Text.RegularExpressions;
using Microsoft.EntityFrameworkCore;
using Newtonsoft.Json;
using Nostr.Client.Json;
using NostrStreamer.Database;
using NostrStreamer.Services.Dvr;
namespace NostrStreamer.Services.StreamManager;
@ -11,21 +13,23 @@ public class NostrStreamManager : IStreamManager
private readonly StreamManagerContext _context;
private readonly StreamEventBuilder _eventBuilder;
private readonly SrsApi _srsApi;
private readonly IDvrStore _dvrStore;
public NostrStreamManager(ILogger<NostrStreamManager> logger, StreamManagerContext context,
StreamEventBuilder eventBuilder, SrsApi srsApi)
StreamEventBuilder eventBuilder, SrsApi srsApi, IDvrStore dvrStore)
{
_logger = logger;
_context = context;
_eventBuilder = eventBuilder;
_srsApi = srsApi;
_dvrStore = dvrStore;
}
public UserStream GetStream()
{
return _context.UserStream;
}
public Task<List<string>> OnForward()
{
if (_context.User.Balance <= 0)
@ -132,6 +136,22 @@ public class NostrStreamManager : IStreamManager
.ExecuteDeleteAsync();
}
public async Task OnDvr(Uri segment)
{
var matches = new Regex("\\.(\\d+)\\.[\\w]{2,4}$").Match(segment.AbsolutePath);
var result = await _dvrStore.UploadRecording(segment);
_context.Db.Recordings.Add(new()
{
UserStreamId = _context.UserStream.Id,
Url = result.Result.ToString(),
Duration = result.Duration,
Timestamp = DateTimeOffset.FromUnixTimeMilliseconds(long.Parse(matches.Groups[1].Value)).UtcDateTime
});
await _context.Db.SaveChangesAsync();
}
public async Task UpdateViewers()
{
if (_context.UserStream.State is not UserStreamState.Live) return;

View File

@ -2,6 +2,7 @@ using Microsoft.EntityFrameworkCore;
using Newtonsoft.Json;
using Nostr.Client.Json;
using NostrStreamer.Database;
using NostrStreamer.Services.Dvr;
namespace NostrStreamer.Services.StreamManager;
@ -11,14 +12,16 @@ public class StreamManagerFactory
private readonly ILoggerFactory _loggerFactory;
private readonly StreamEventBuilder _eventBuilder;
private readonly SrsApi _srsApi;
private readonly IDvrStore _dvrStore;
public StreamManagerFactory(StreamerContext db, ILoggerFactory loggerFactory, StreamEventBuilder eventBuilder,
SrsApi srsApi)
SrsApi srsApi, IDvrStore dvrStore)
{
_db = db;
_loggerFactory = loggerFactory;
_eventBuilder = eventBuilder;
_srsApi = srsApi;
_dvrStore = dvrStore;
}
public async Task<IStreamManager> ForStream(Guid id)
@ -27,6 +30,7 @@ public class StreamManagerFactory
.AsNoTracking()
.Include(a => a.User)
.Include(a => a.Endpoint)
.Include(a => a.Recordings)
.FirstOrDefaultAsync(a => a.Id == id);
if (currentStream == default) throw new Exception("No live stream");
@ -37,15 +41,16 @@ public class StreamManagerFactory
UserStream = currentStream
};
return new NostrStreamManager(_loggerFactory.CreateLogger<NostrStreamManager>(), ctx, _eventBuilder, _srsApi);
return new NostrStreamManager(_loggerFactory.CreateLogger<NostrStreamManager>(), ctx, _eventBuilder, _srsApi, _dvrStore);
}
public async Task<IStreamManager> ForCurrentStream(string pubkey)
{
var currentStream = await _db.Streams
.AsNoTracking()
.Include(a => a.User)
.Include(a => a.Endpoint)
.Include(a => a.Recordings)
.FirstOrDefaultAsync(a => a.PubKey.Equals(pubkey) && a.State == UserStreamState.Live);
if (currentStream == default) throw new Exception("No live stream");
@ -56,7 +61,7 @@ public class StreamManagerFactory
UserStream = currentStream
};
return new NostrStreamManager(_loggerFactory.CreateLogger<NostrStreamManager>(), ctx, _eventBuilder, _srsApi);
return new NostrStreamManager(_loggerFactory.CreateLogger<NostrStreamManager>(), ctx, _eventBuilder, _srsApi, _dvrStore);
}
public async Task<IStreamManager> ForStream(StreamInfo info)
@ -65,6 +70,7 @@ public class StreamManagerFactory
.AsNoTracking()
.Include(a => a.User)
.Include(a => a.Endpoint)
.Include(a => a.Recordings)
.FirstOrDefaultAsync(a =>
a.ClientId.Equals(info.ClientId) &&
a.User.StreamKey.Equals(info.StreamKey) &&
@ -118,6 +124,6 @@ public class StreamManagerFactory
UserStream = stream
};
return new NostrStreamManager(_loggerFactory.CreateLogger<NostrStreamManager>(), ctx, _eventBuilder, _srsApi);
return new NostrStreamManager(_loggerFactory.CreateLogger<NostrStreamManager>(), ctx, _eventBuilder, _srsApi, _dvrStore);
}
}

View File

@ -24,6 +24,11 @@
"Endpoint": "https://localhost:10002",
"CertPath": "/Users/kieran/.polar/networks/1/volumes/lnd/bob/tls.cert",
"MacaroonPath": "/Users/kieran/.polar/networks/1/volumes/lnd/bob/data/chain/bitcoin/regtest/admin.macaroon"
},
"DvrStore": {
"ServiceUrl": "http://localhost:9010",
"AccessKey": "TQcxug1ZAXfnZ5bvc9n5",
"SecretKey": "p7EK4qew6DBkBPqrpRPuJgTOc6ChUlfIcEdAwE7K"
}
}
}

View File

@ -1,3 +1,5 @@
volumes:
minio-dvr:
services:
srs-origin:
image: ossrs/srs:5
@ -26,4 +28,16 @@ services:
environment:
- "POSTGRES_HOST_AUTH_METHOD=trust"
ports:
- "5431:5432"
- "5431:5432"
minio:
image: quay.io/minio/minio
command:
- "server"
- "/data"
- "--console-address"
- ":9001"
ports:
- "9010:9000"
- "9011:9001"
volumes:
- "minio-dvr:/data"

View File

@ -35,5 +35,14 @@ vhost hls.zap.stream {
on_publish http://10.100.2.226:5295/api/srs;
on_unpublish http://10.100.2.226:5295/api/srs;
on_hls http://10.100.2.226:5295/api/srs;
on_dvr http://10.100.2.226:5295/api/srs;
}
dvr {
enabled on;
dvr_path ./objs/nginx/html/[app]/[stream].[timestamp].mp4;
dvr_plan segment;
dvr_duration 30;
dvr_wait_keyframe on;
}
}