Resume stream

This commit is contained in:
2023-08-22 20:04:25 +01:00
parent 5a2bd21cec
commit 092804f023
5 changed files with 41 additions and 18 deletions

View File

@ -74,7 +74,8 @@ public class SrsController : Controller
if (req.Action == "on_unpublish") if (req.Action == "on_unpublish")
{ {
await streamManager.StreamStopped(); //bug: ignore on_unpublish
//await streamManager.StreamStopped();
return new(); return new();
} }

View File

@ -25,26 +25,25 @@ public class BackgroundStreamManager : BackgroundService
var streamManager = scope.ServiceProvider.GetRequiredService<StreamManagerFactory>(); var streamManager = scope.ServiceProvider.GetRequiredService<StreamManagerFactory>();
var db = scope.ServiceProvider.GetRequiredService<StreamerContext>(); var db = scope.ServiceProvider.GetRequiredService<StreamerContext>();
var srs = scope.ServiceProvider.GetRequiredService<SrsApi>();
var recentlyEnded = DateTime.UtcNow.Subtract(TimeSpan.FromMinutes(5));
var liveStreams = await db.Streams var liveStreams = await db.Streams
.AsNoTracking() .AsNoTracking()
.Where(a => a.State == UserStreamState.Live || a.Ends > recentlyEnded) .Where(a => a.State == UserStreamState.Live)
.Select(a => a.Id) .Select(a => a.Id)
.ToListAsync(cancellationToken: stoppingToken); .ToListAsync(cancellationToken: stoppingToken);
foreach (var id in liveStreams) foreach (var id in liveStreams)
{ {
var manager = await streamManager.ForStream(id); var manager = await streamManager.ForStream(id);
var client = await srs.GetStream(manager.GetStream().StreamId); var lastSegment = await manager.GetLatestRecordingSegment();
if (client != default) var timeoutStream = DateTime.UtcNow.Subtract(TimeSpan.FromMinutes(2)) > lastSegment?.Timestamp;
if (timeoutStream)
{ {
await manager.UpdateViewers(); await manager.StreamStopped();
} }
else else
{ {
await manager.StreamStopped(); await manager.UpdateViewers();
} }
} }
} }

View File

@ -80,4 +80,10 @@ public interface IStreamManager
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
Task<List<UserStreamRecording>> GetRecordings(); Task<List<UserStreamRecording>> GetRecordings();
/// <summary>
/// Return the last added recording segment
/// </summary>
/// <returns></returns>
Task<UserStreamRecording?> GetLatestRecordingSegment();
} }

View File

@ -142,6 +142,14 @@ public class NostrStreamManager : IStreamManager
.ToListAsync(); .ToListAsync();
} }
public async Task<UserStreamRecording?> GetLatestRecordingSegment()
{
return await _context.Db.Recordings.AsNoTracking()
.Where(a => a.UserStreamId == _context.UserStream.Id)
.OrderByDescending(a => a.Timestamp)
.FirstOrDefaultAsync();
}
public async Task UpdateViewers() public async Task UpdateViewers()
{ {
if (_context.UserStream.State is not UserStreamState.Live) return; if (_context.UserStream.State is not UserStreamState.Live) return;

View File

@ -37,11 +37,6 @@ public class StreamManagerFactory
if (ep == default) throw new Exception("No endpoint found"); if (ep == default) throw new Exception("No endpoint found");
if (await _db.Streams.CountAsync(a => a.State == UserStreamState.Live && a.PubKey == user.PubKey) != 0)
{
throw new Exception("Cannot start a new stream when already live");
}
if (user.Balance <= 0) if (user.Balance <= 0)
{ {
throw new LowBalanceException("Cannot start stream with empty balance"); throw new LowBalanceException("Cannot start stream with empty balance");
@ -52,7 +47,10 @@ public class StreamManagerFactory
throw new Exception("TOS not accepted"); throw new Exception("TOS not accepted");
} }
var stream = new UserStream var existingLive = await _db.Streams
.SingleOrDefaultAsync(a => a.State == UserStreamState.Live && a.PubKey == user.PubKey);
var stream = existingLive ?? new UserStream
{ {
EndpointId = ep.Id, EndpointId = ep.Id,
PubKey = user.PubKey, PubKey = user.PubKey,
@ -62,10 +60,21 @@ public class StreamManagerFactory
ForwardClientId = info.ClientId ForwardClientId = info.ClientId
}; };
// add new stream
if (existingLive == default)
{
var ev = _eventBuilder.CreateStreamEvent(user, stream); var ev = _eventBuilder.CreateStreamEvent(user, stream);
stream.Event = JsonConvert.SerializeObject(ev, NostrSerializer.Settings); stream.Event = JsonConvert.SerializeObject(ev, NostrSerializer.Settings);
_db.Streams.Add(stream); _db.Streams.Add(stream);
await _db.SaveChangesAsync(); await _db.SaveChangesAsync();
}
else
{
// resume stream, update edge forward info
existingLive.EdgeIp = info.EdgeIp;
existingLive.ForwardClientId = info.ClientId;
await _db.SaveChangesAsync();
}
var ctx = new StreamManagerContext var ctx = new StreamManagerContext
{ {