feat: stream-keys

This commit is contained in:
2024-08-28 10:02:49 +01:00
parent 015b75f894
commit 97e43c8cbd
27 changed files with 1723 additions and 87 deletions

View File

@ -57,7 +57,7 @@ public class NostrStreamManager : IStreamManager
TestCanStream();
var fwds = new List<string>
{
$"rtmp://127.0.0.1:1935/{_context.UserStream.Endpoint.App}/{_context.User.StreamKey}?vhost={_context.UserStream.Endpoint.Forward}"
$"rtmp://127.0.0.1:1935/{_context.UserStream.Endpoint.App}/{_context.StreamKey}?vhost={_context.UserStream.Endpoint.Forward}"
};
var dataProtector = _dataProtectionProvider.CreateProtector("forward-targets");
@ -103,7 +103,6 @@ public class NostrStreamManager : IStreamManager
}
}
});
}
public async Task StreamStopped()
@ -194,20 +193,27 @@ public class NostrStreamManager : IStreamManager
{
//var matches = new Regex("\\.(\\d+)\\.[\\w]{2,4}$").Match(segment.AbsolutePath);
if (_context.UserStream.Endpoint.Capabilities.Contains("dvr:source"))
try
{
var result = await _dvrStore.UploadRecording(_context.UserStream, segment);
_context.Db.Recordings.Add(new()
if (_context.UserStream.Endpoint.Capabilities.Contains("dvr:source"))
{
Id = result.Id,
UserStreamId = _context.UserStream.Id,
Url = result.Result.ToString(),
Duration = result.Duration,
Timestamp = DateTime
.UtcNow //DateTimeOffset.FromUnixTimeMilliseconds(long.Parse(matches.Groups[1].Value)).UtcDateTime
});
var result = await _dvrStore.UploadRecording(_context.UserStream, segment);
_context.Db.Recordings.Add(new()
{
Id = result.Id,
UserStreamId = _context.UserStream.Id,
Url = result.Result.ToString(),
Duration = result.Duration,
Timestamp = DateTime
.UtcNow //DateTimeOffset.FromUnixTimeMilliseconds(long.Parse(matches.Groups[1].Value)).UtcDateTime
});
await _context.Db.SaveChangesAsync();
await _context.Db.SaveChangesAsync();
}
}
catch (Exception ex)
{
_logger.LogWarning("Failed to save recording segment {}, {}", segment, ex.Message);
}
await _context.Db.Streams
@ -242,7 +248,7 @@ public class NostrStreamManager : IStreamManager
var existingEvent = _context.UserStream.GetEvent();
var oldViewers = existingEvent?.Tags?.FindFirstTagValue("current_participants");
var newEvent = _eventBuilder.CreateStreamEvent(_context.User, _context.UserStream);
var newEvent = _eventBuilder.CreateStreamEvent(_context.UserStream);
var newViewers = newEvent?.Tags?.FindFirstTagValue("current_participants");
if (newEvent != default && int.TryParse(oldViewers, out var a) && int.TryParse(newViewers, out var b) && a != b)
@ -260,7 +266,7 @@ public class NostrStreamManager : IStreamManager
DateTime? ends = state == UserStreamState.Ended ? DateTime.UtcNow : null;
_context.UserStream.State = state;
_context.UserStream.Ends = ends;
var ev = _eventBuilder.CreateStreamEvent(_context.User, _context.UserStream);
var ev = _eventBuilder.CreateStreamEvent(_context.UserStream);
await _context.Db.Streams.Where(a => a.Id == _context.UserStream.Id)
.ExecuteUpdateAsync(o => o.SetProperty(v => v.State, state)

View File

@ -9,4 +9,5 @@ public class StreamManagerContext
public User User => UserStream.User;
public StreamInfo? StreamInfo { get; init; }
public SrsApi EdgeApi { get; init; } = null!;
public string StreamKey { get; init; } = null!;
}

View File

@ -28,7 +28,9 @@ public class StreamManagerFactory
var user = await _db.Users
.AsNoTracking()
.Include(a => a.Forwards)
.SingleOrDefaultAsync(a => a.StreamKey.Equals(info.StreamKey));
.Include(user => user.StreamKeys)
.SingleOrDefaultAsync(a =>
a.StreamKey.Equals(info.StreamKey) || a.StreamKeys.Any(b => b.Key == info.StreamKey));
if (user == default) throw new Exception("No user found");
@ -53,24 +55,28 @@ public class StreamManagerFactory
throw new Exception("User account blocked");
}
var existingLive = await _db.Streams
.SingleOrDefaultAsync(a => a.State == UserStreamState.Live && a.PubKey == user.PubKey);
var singleUseKey = user.StreamKeys.FirstOrDefault(a => a.Key == info.StreamKey);
var existingLive = singleUseKey != default
? await _db.Streams.SingleOrDefaultAsync(a => a.Id == singleUseKey.StreamId)
: await _db.Streams
.SingleOrDefaultAsync(a => a.State == UserStreamState.Live && a.PubKey == user.PubKey);
var stream = existingLive ?? new UserStream
{
EndpointId = ep.Id,
PubKey = user.PubKey,
StreamId = "",
State = UserStreamState.Live,
EdgeIp = info.EdgeIp,
ForwardClientId = info.ClientId
ForwardClientId = info.ClientId,
};
// add new stream
if (existingLive == default)
{
var ev = _eventBuilder.CreateStreamEvent(user, stream);
stream.Event = JsonConvert.SerializeObject(ev, NostrSerializer.Settings);
await stream.CopyLastStreamDetails(_db);
var ev = _eventBuilder.CreateStreamEvent(stream);
stream.Event = NostrJson.Serialize(ev) ?? "";
_db.Streams.Add(stream);
await _db.SaveChangesAsync();
}
@ -85,18 +91,19 @@ public class StreamManagerFactory
var ctx = new StreamManagerContext
{
Db = _db,
StreamKey = info.StreamKey,
UserStream = new()
{
Id = stream.Id,
PubKey = stream.PubKey,
StreamId = stream.StreamId,
State = stream.State,
EdgeIp = stream.EdgeIp,
ForwardClientId = stream.ForwardClientId,
Endpoint = ep,
User = user
},
EdgeApi = new SrsApi(_serviceProvider.GetRequiredService<HttpClient>(), new Uri($"http://{stream.EdgeIp}:1985"))
EdgeApi = new SrsApi(_serviceProvider.GetRequiredService<HttpClient>(),
new Uri($"http://{stream.EdgeIp}:1985"))
};
return new NostrStreamManager(_loggerFactory.CreateLogger<NostrStreamManager>(), ctx, _serviceProvider);
@ -108,6 +115,7 @@ public class StreamManagerFactory
.AsNoTracking()
.Include(a => a.User)
.Include(a => a.Endpoint)
.Include(a => a.StreamKey)
.FirstOrDefaultAsync(a => a.Id == id);
if (stream == default) throw new Exception("No live stream");
@ -115,8 +123,10 @@ public class StreamManagerFactory
var ctx = new StreamManagerContext
{
Db = _db,
StreamKey = stream.StreamKey?.Key ?? stream.User.StreamKey,
UserStream = stream,
EdgeApi = new SrsApi(_serviceProvider.GetRequiredService<HttpClient>(), new Uri($"http://{stream.EdgeIp}:1985"))
EdgeApi = new SrsApi(_serviceProvider.GetRequiredService<HttpClient>(),
new Uri($"http://{stream.EdgeIp}:1985"))
};
return new NostrStreamManager(_loggerFactory.CreateLogger<NostrStreamManager>(), ctx, _serviceProvider);
@ -128,6 +138,7 @@ public class StreamManagerFactory
.AsNoTracking()
.Include(a => a.User)
.Include(a => a.Endpoint)
.Include(a => a.StreamKey)
.FirstOrDefaultAsync(a => a.PubKey.Equals(pubkey) && a.State == UserStreamState.Live);
if (stream == default) throw new Exception("No live stream");
@ -135,8 +146,10 @@ public class StreamManagerFactory
var ctx = new StreamManagerContext
{
Db = _db,
StreamKey = stream.StreamKey?.Key ?? stream.User.StreamKey,
UserStream = stream,
EdgeApi = new SrsApi(_serviceProvider.GetRequiredService<HttpClient>(), new Uri($"http://{stream.EdgeIp}:1985"))
EdgeApi = new SrsApi(_serviceProvider.GetRequiredService<HttpClient>(),
new Uri($"http://{stream.EdgeIp}:1985"))
};
return new NostrStreamManager(_loggerFactory.CreateLogger<NostrStreamManager>(), ctx, _serviceProvider);
@ -148,11 +161,13 @@ public class StreamManagerFactory
.AsNoTracking()
.Include(a => a.User)
.Include(a => a.Endpoint)
.Include(a => a.StreamKey)
.OrderByDescending(a => a.Starts)
.FirstOrDefaultAsync(a =>
a.User.StreamKey.Equals(info.StreamKey) &&
a.Endpoint.App.Equals(info.App) &&
a.State == UserStreamState.Live);
(a.StreamKey != default && a.StreamKey.Key == info.StreamKey) ||
(a.User.StreamKey.Equals(info.StreamKey) &&
a.Endpoint.App.Equals(info.App) &&
a.State == UserStreamState.Live));
if (stream == default)
{
@ -162,11 +177,13 @@ public class StreamManagerFactory
var ctx = new StreamManagerContext
{
Db = _db,
StreamKey = info.StreamKey,
UserStream = stream,
StreamInfo = info,
EdgeApi = new SrsApi(_serviceProvider.GetRequiredService<HttpClient>(), new Uri($"http://{stream.EdgeIp}:1985"))
EdgeApi = new SrsApi(_serviceProvider.GetRequiredService<HttpClient>(),
new Uri($"http://{stream.EdgeIp}:1985"))
};
return new NostrStreamManager(_loggerFactory.CreateLogger<NostrStreamManager>(), ctx, _serviceProvider);
}
}
}