diff --git a/NostrStreamer/Controllers/SRSController.cs b/NostrStreamer/Controllers/SRSController.cs index fe64adf..92222d4 100644 --- a/NostrStreamer/Controllers/SRSController.cs +++ b/NostrStreamer/Controllers/SRSController.cs @@ -74,7 +74,8 @@ public class SrsController : Controller if (req.Action == "on_unpublish") { - await streamManager.StreamStopped(); + //bug: ignore on_unpublish + //await streamManager.StreamStopped(); return new(); } diff --git a/NostrStreamer/Services/Background/BackgroundStreamManager.cs b/NostrStreamer/Services/Background/BackgroundStreamManager.cs index eb32795..7423ec0 100644 --- a/NostrStreamer/Services/Background/BackgroundStreamManager.cs +++ b/NostrStreamer/Services/Background/BackgroundStreamManager.cs @@ -25,26 +25,25 @@ public class BackgroundStreamManager : BackgroundService var streamManager = scope.ServiceProvider.GetRequiredService(); var db = scope.ServiceProvider.GetRequiredService(); - var srs = scope.ServiceProvider.GetRequiredService(); - var recentlyEnded = DateTime.UtcNow.Subtract(TimeSpan.FromMinutes(5)); var liveStreams = await db.Streams .AsNoTracking() - .Where(a => a.State == UserStreamState.Live || a.Ends > recentlyEnded) + .Where(a => a.State == UserStreamState.Live) .Select(a => a.Id) .ToListAsync(cancellationToken: stoppingToken); foreach (var id in liveStreams) { var manager = await streamManager.ForStream(id); - var client = await srs.GetStream(manager.GetStream().StreamId); - if (client != default) + var lastSegment = await manager.GetLatestRecordingSegment(); + var timeoutStream = DateTime.UtcNow.Subtract(TimeSpan.FromMinutes(2)) > lastSegment?.Timestamp; + if (timeoutStream) { - await manager.UpdateViewers(); + await manager.StreamStopped(); } else { - await manager.StreamStopped(); + await manager.UpdateViewers(); } } } diff --git a/NostrStreamer/Services/StreamManager/IStreamManager.cs b/NostrStreamer/Services/StreamManager/IStreamManager.cs index 5945486..67804ca 100644 --- a/NostrStreamer/Services/StreamManager/IStreamManager.cs +++ b/NostrStreamer/Services/StreamManager/IStreamManager.cs @@ -80,4 +80,10 @@ public interface IStreamManager /// /// Task> GetRecordings(); + + /// + /// Return the last added recording segment + /// + /// + Task GetLatestRecordingSegment(); } \ No newline at end of file diff --git a/NostrStreamer/Services/StreamManager/NostrStreamManager.cs b/NostrStreamer/Services/StreamManager/NostrStreamManager.cs index a9ed748..222eded 100644 --- a/NostrStreamer/Services/StreamManager/NostrStreamManager.cs +++ b/NostrStreamer/Services/StreamManager/NostrStreamManager.cs @@ -142,6 +142,14 @@ public class NostrStreamManager : IStreamManager .ToListAsync(); } + public async Task GetLatestRecordingSegment() + { + return await _context.Db.Recordings.AsNoTracking() + .Where(a => a.UserStreamId == _context.UserStream.Id) + .OrderByDescending(a => a.Timestamp) + .FirstOrDefaultAsync(); + } + public async Task UpdateViewers() { if (_context.UserStream.State is not UserStreamState.Live) return; diff --git a/NostrStreamer/Services/StreamManager/StreamManagerFactory.cs b/NostrStreamer/Services/StreamManager/StreamManagerFactory.cs index 30f5faa..ab8bbd7 100644 --- a/NostrStreamer/Services/StreamManager/StreamManagerFactory.cs +++ b/NostrStreamer/Services/StreamManager/StreamManagerFactory.cs @@ -37,11 +37,6 @@ public class StreamManagerFactory if (ep == default) throw new Exception("No endpoint found"); - if (await _db.Streams.CountAsync(a => a.State == UserStreamState.Live && a.PubKey == user.PubKey) != 0) - { - throw new Exception("Cannot start a new stream when already live"); - } - if (user.Balance <= 0) { throw new LowBalanceException("Cannot start stream with empty balance"); @@ -52,7 +47,10 @@ public class StreamManagerFactory throw new Exception("TOS not accepted"); } - var stream = new UserStream + var existingLive = await _db.Streams + .SingleOrDefaultAsync(a => a.State == UserStreamState.Live && a.PubKey == user.PubKey); + + var stream = existingLive ?? new UserStream { EndpointId = ep.Id, PubKey = user.PubKey, @@ -62,10 +60,21 @@ public class StreamManagerFactory ForwardClientId = info.ClientId }; - var ev = _eventBuilder.CreateStreamEvent(user, stream); - stream.Event = JsonConvert.SerializeObject(ev, NostrSerializer.Settings); - _db.Streams.Add(stream); - await _db.SaveChangesAsync(); + // add new stream + if (existingLive == default) + { + var ev = _eventBuilder.CreateStreamEvent(user, stream); + stream.Event = JsonConvert.SerializeObject(ev, NostrSerializer.Settings); + _db.Streams.Add(stream); + await _db.SaveChangesAsync(); + } + else + { + // resume stream, update edge forward info + existingLive.EdgeIp = info.EdgeIp; + existingLive.ForwardClientId = info.ClientId; + await _db.SaveChangesAsync(); + } var ctx = new StreamManagerContext {