Use stream_id to identify stream

This commit is contained in:
2023-07-31 14:08:54 +01:00
parent 99efab6013
commit 8e18029101
12 changed files with 359 additions and 20 deletions

View File

@ -38,6 +38,7 @@ public class SrsController : Controller
App = appSplit[0],
Variant = appSplit.Length > 1 ? appSplit[1] : "",
ClientId = req.ClientId!,
StreamId = req.StreamId!,
StreamKey = req.Stream
});
@ -61,12 +62,6 @@ public class SrsController : Controller
};
}
if (req.App.EndsWith("720h") && req.Action == "on_hls" && !string.IsNullOrEmpty(req.File))
{
await streamManager.OnDvr(new Uri(_config.SrsHttpHost, $"{req.App}/{Path.GetFileName(req.File)}"));
return new();
}
if (req.App.EndsWith("/source"))
{
if (req.Action == "on_publish")
@ -84,6 +79,8 @@ public class SrsController : Controller
if (req.Action == "on_hls" && req.Duration.HasValue && !string.IsNullOrEmpty(req.ClientId))
{
await streamManager.ConsumeQuota(req.Duration.Value);
await streamManager.OnDvr(new Uri(_config.SrsHttpHost, $"{req.App}/{Path.GetFileName(req.File)}"));
return new();
}
/*if (req.Action == "on_dvr" && !string.IsNullOrEmpty(req.File))
@ -134,6 +131,9 @@ public class SrsHook
[JsonProperty("client_id")]
public string? ClientId { get; set; }
[JsonProperty("stream_id")]
public string? StreamId { get; set; }
[JsonProperty("ip")]
public string? Ip { get; set; }

View File

@ -8,7 +8,7 @@ public class UserStreamConfiguration : IEntityTypeConfiguration<UserStream>
public void Configure(EntityTypeBuilder<UserStream> builder)
{
builder.HasKey(a => a.Id);
builder.Property(a => a.ClientId)
builder.Property(a => a.StreamId)
.IsRequired();
builder.Property(a => a.Starts)

View File

@ -7,7 +7,7 @@ public class UserStream
public string PubKey { get; init; } = null!;
public User User { get; init; } = null!;
public string ClientId { get; init; } = null!;
public string StreamId { get; init; } = null!;
public DateTime Starts { get; init; } = DateTime.UtcNow;

View File

@ -0,0 +1,304 @@
// <auto-generated />
using System;
using System.Collections.Generic;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Migrations;
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
using NostrStreamer.Database;
using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata;
#nullable disable
namespace NostrStreamer.Migrations
{
[DbContext(typeof(StreamerContext))]
[Migration("20230731124217_StreamId")]
partial class StreamId
{
/// <inheritdoc />
protected override void BuildTargetModel(ModelBuilder modelBuilder)
{
#pragma warning disable 612, 618
modelBuilder
.HasAnnotation("ProductVersion", "7.0.8")
.HasAnnotation("Relational:MaxIdentifierLength", 63);
NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder);
modelBuilder.Entity("NostrStreamer.Database.IngestEndpoint", b =>
{
b.Property<Guid>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("uuid");
b.Property<string>("App")
.IsRequired()
.HasColumnType("text");
b.Property<List<string>>("Capabilities")
.IsRequired()
.HasColumnType("text[]");
b.Property<int>("Cost")
.HasColumnType("integer");
b.Property<string>("Forward")
.IsRequired()
.HasColumnType("text");
b.Property<string>("Name")
.IsRequired()
.HasColumnType("text");
b.HasKey("Id");
b.HasIndex("App")
.IsUnique();
b.ToTable("Endpoints");
});
modelBuilder.Entity("NostrStreamer.Database.Payment", b =>
{
b.Property<string>("PaymentHash")
.HasColumnType("text");
b.Property<decimal>("Amount")
.HasColumnType("numeric(20,0)");
b.Property<DateTime>("Created")
.HasColumnType("timestamp with time zone");
b.Property<string>("Invoice")
.IsRequired()
.HasColumnType("text");
b.Property<bool>("IsPaid")
.HasColumnType("boolean");
b.Property<string>("Nostr")
.HasColumnType("text");
b.Property<string>("PubKey")
.IsRequired()
.HasColumnType("text");
b.Property<int>("Type")
.HasColumnType("integer");
b.HasKey("PaymentHash");
b.HasIndex("PubKey");
b.ToTable("Payments");
});
modelBuilder.Entity("NostrStreamer.Database.User", b =>
{
b.Property<string>("PubKey")
.HasColumnType("text");
b.Property<long>("Balance")
.HasColumnType("bigint");
b.Property<string>("ContentWarning")
.HasColumnType("text");
b.Property<string>("Image")
.HasColumnType("text");
b.Property<string>("StreamKey")
.IsRequired()
.HasColumnType("text");
b.Property<string>("Summary")
.HasColumnType("text");
b.Property<string>("Tags")
.HasColumnType("text");
b.Property<string>("Title")
.HasColumnType("text");
b.Property<uint>("Version")
.IsConcurrencyToken()
.ValueGeneratedOnAddOrUpdate()
.HasColumnType("xid")
.HasColumnName("xmin");
b.HasKey("PubKey");
b.ToTable("Users");
});
modelBuilder.Entity("NostrStreamer.Database.UserStream", b =>
{
b.Property<Guid>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("uuid");
b.Property<Guid>("EndpointId")
.HasColumnType("uuid");
b.Property<DateTime?>("Ends")
.HasColumnType("timestamp with time zone");
b.Property<string>("Event")
.IsRequired()
.HasColumnType("text");
b.Property<string>("PubKey")
.IsRequired()
.HasColumnType("text");
b.Property<string>("Recording")
.HasColumnType("text");
b.Property<DateTime>("Starts")
.HasColumnType("timestamp with time zone");
b.Property<int>("State")
.HasColumnType("integer");
b.Property<string>("StreamId")
.IsRequired()
.HasColumnType("text");
b.HasKey("Id");
b.HasIndex("EndpointId");
b.HasIndex("PubKey");
b.ToTable("Streams");
});
modelBuilder.Entity("NostrStreamer.Database.UserStreamGuest", b =>
{
b.Property<Guid>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("uuid");
b.Property<string>("PubKey")
.IsRequired()
.HasColumnType("text");
b.Property<string>("Relay")
.HasColumnType("text");
b.Property<string>("Role")
.HasColumnType("text");
b.Property<string>("Sig")
.HasColumnType("text");
b.Property<Guid>("StreamId")
.HasColumnType("uuid");
b.Property<decimal>("ZapSplit")
.HasColumnType("numeric");
b.HasKey("Id");
b.HasIndex("StreamId", "PubKey")
.IsUnique();
b.ToTable("Guests");
});
modelBuilder.Entity("NostrStreamer.Database.UserStreamRecording", b =>
{
b.Property<Guid>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("uuid");
b.Property<double>("Duration")
.HasColumnType("double precision");
b.Property<DateTime>("Timestamp")
.HasColumnType("timestamp with time zone");
b.Property<string>("Url")
.IsRequired()
.HasColumnType("text");
b.Property<Guid>("UserStreamId")
.HasColumnType("uuid");
b.HasKey("Id");
b.HasIndex("UserStreamId");
b.ToTable("Recordings");
});
modelBuilder.Entity("NostrStreamer.Database.Payment", b =>
{
b.HasOne("NostrStreamer.Database.User", "User")
.WithMany("Payments")
.HasForeignKey("PubKey")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
b.Navigation("User");
});
modelBuilder.Entity("NostrStreamer.Database.UserStream", b =>
{
b.HasOne("NostrStreamer.Database.IngestEndpoint", "Endpoint")
.WithMany()
.HasForeignKey("EndpointId")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
b.HasOne("NostrStreamer.Database.User", "User")
.WithMany("Streams")
.HasForeignKey("PubKey")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
b.Navigation("Endpoint");
b.Navigation("User");
});
modelBuilder.Entity("NostrStreamer.Database.UserStreamGuest", b =>
{
b.HasOne("NostrStreamer.Database.UserStream", "Stream")
.WithMany("Guests")
.HasForeignKey("StreamId")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
b.Navigation("Stream");
});
modelBuilder.Entity("NostrStreamer.Database.UserStreamRecording", b =>
{
b.HasOne("NostrStreamer.Database.UserStream", "Stream")
.WithMany("Recordings")
.HasForeignKey("UserStreamId")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
b.Navigation("Stream");
});
modelBuilder.Entity("NostrStreamer.Database.User", b =>
{
b.Navigation("Payments");
b.Navigation("Streams");
});
modelBuilder.Entity("NostrStreamer.Database.UserStream", b =>
{
b.Navigation("Guests");
b.Navigation("Recordings");
});
#pragma warning restore 612, 618
}
}
}

View File

@ -0,0 +1,28 @@
using Microsoft.EntityFrameworkCore.Migrations;
#nullable disable
namespace NostrStreamer.Migrations
{
/// <inheritdoc />
public partial class StreamId : Migration
{
/// <inheritdoc />
protected override void Up(MigrationBuilder migrationBuilder)
{
migrationBuilder.RenameColumn(
name: "ClientId",
table: "Streams",
newName: "StreamId");
}
/// <inheritdoc />
protected override void Down(MigrationBuilder migrationBuilder)
{
migrationBuilder.RenameColumn(
name: "StreamId",
table: "Streams",
newName: "ClientId");
}
}
}

View File

@ -135,10 +135,6 @@ namespace NostrStreamer.Migrations
.ValueGeneratedOnAdd()
.HasColumnType("uuid");
b.Property<string>("ClientId")
.IsRequired()
.HasColumnType("text");
b.Property<Guid>("EndpointId")
.HasColumnType("uuid");
@ -162,6 +158,10 @@ namespace NostrStreamer.Migrations
b.Property<int>("State")
.HasColumnType("integer");
b.Property<string>("StreamId")
.IsRequired()
.HasColumnType("text");
b.HasKey("Id");
b.HasIndex("EndpointId");

View File

@ -36,7 +36,7 @@ public class BackgroundStreamManager : BackgroundService
foreach (var id in liveStreams)
{
var manager = await streamManager.ForStream(id);
var client = await srs.GetClient(manager.GetStream().ClientId);
var client = await srs.GetStream(manager.GetStream().StreamId);
if (client != default)
{
await manager.UpdateViewers();

View File

@ -1,3 +1,5 @@
using NostrStreamer.Database;
namespace NostrStreamer.Services.Dvr;
public interface IDvrStore
@ -5,9 +7,10 @@ public interface IDvrStore
/// <summary>
/// Upload a DVR recording to storage and return the URL
/// </summary>
/// <param name="stream"></param>
/// <param name="source"></param>
/// <returns></returns>
Task<UploadResult> UploadRecording(Uri source);
Task<UploadResult> UploadRecording(UserStream stream, Uri source);
}
public record UploadResult(Uri Result, double Duration);

View File

@ -87,7 +87,7 @@ public class NostrStreamManager : IStreamManager
if (_context.User.Balance <= 0)
{
_logger.LogInformation("Kicking stream due to low balance");
await _srsApi.KickClient(_context.UserStream.ClientId);
await _srsApi.KickClient(_context.StreamInfo.ClientId);
}
}
@ -140,7 +140,7 @@ public class NostrStreamManager : IStreamManager
{
//var matches = new Regex("\\.(\\d+)\\.[\\w]{2,4}$").Match(segment.AbsolutePath);
var result = await _dvrStore.UploadRecording(segment);
var result = await _dvrStore.UploadRecording(_context.UserStream, segment);
_context.Db.Recordings.Add(new()
{
UserStreamId = _context.UserStream.Id,

View File

@ -9,4 +9,6 @@ public class StreamInfo
public string StreamKey { get; init; } = null!;
public string ClientId { get; init; } = null!;
public string StreamId { get; init; } = null!;
}

View File

@ -7,4 +7,5 @@ public class StreamManagerContext
public StreamerContext Db { get; init; } = null!;
public UserStream UserStream { get; init; } = null!;
public User User => UserStream.User;
public StreamInfo? StreamInfo { get; init; }
}

View File

@ -72,7 +72,7 @@ public class StreamManagerFactory
.Include(a => a.Endpoint)
.Include(a => a.Recordings)
.FirstOrDefaultAsync(a =>
a.ClientId.Equals(info.ClientId) &&
a.StreamId.Equals(info.StreamId) &&
a.User.StreamKey.Equals(info.StreamKey) &&
a.Endpoint.App.Equals(info.App));
@ -97,7 +97,7 @@ public class StreamManagerFactory
{
EndpointId = ep.Id,
PubKey = user.PubKey,
ClientId = info.ClientId,
StreamId = info.StreamId,
State = UserStreamState.Planned
};
@ -113,7 +113,7 @@ public class StreamManagerFactory
Id = stream?.Id ?? Guid.NewGuid(),
User = user,
Endpoint = ep,
ClientId = info.ClientId,
StreamId = info.StreamId,
State = UserStreamState.Planned,
};
}
@ -121,7 +121,8 @@ public class StreamManagerFactory
var ctx = new StreamManagerContext
{
Db = _db,
UserStream = stream
UserStream = stream,
StreamInfo = info
};
return new NostrStreamManager(_loggerFactory.CreateLogger<NostrStreamManager>(), ctx, _eventBuilder, _srsApi, _dvrStore);