Implemented daemon side stats streaming. Fixed server task cancellation being too quick. Improved console streaming
This commit is contained in:
@@ -69,14 +69,23 @@ public class Server : IAsyncDisposable
|
|||||||
StateMachine.Configure(ServerState.Installing)
|
StateMachine.Configure(ServerState.Installing)
|
||||||
.Permit(ServerTrigger.FailSafe, ServerState.Offline)
|
.Permit(ServerTrigger.FailSafe, ServerState.Offline)
|
||||||
.Permit(ServerTrigger.Exited, ServerState.Offline);
|
.Permit(ServerTrigger.Exited, ServerState.Offline);
|
||||||
|
|
||||||
// Configure task reset when server goes offline
|
|
||||||
StateMachine.Configure(ServerState.Offline)
|
StateMachine.Configure(ServerState.Offline)
|
||||||
.OnEntryAsync(async () =>
|
.OnEntryAsync(async () =>
|
||||||
{
|
{
|
||||||
|
// Configure task reset when server goes offline
|
||||||
|
|
||||||
if (!TaskCancellationSource.IsCancellationRequested)
|
if (!TaskCancellationSource.IsCancellationRequested)
|
||||||
await TaskCancellationSource.CancelAsync();
|
await TaskCancellationSource.CancelAsync();
|
||||||
|
})
|
||||||
|
.OnExit(() =>
|
||||||
|
{
|
||||||
|
// Activate tasks when the server goes online
|
||||||
|
// If we don't separate the disabling and enabling
|
||||||
|
// of the tasks and would do both it in just the offline handler
|
||||||
|
// we would have edge cases where reconnect loops would already have the new task activated
|
||||||
|
// while they are supposed to shut down. I tested the handling of the state machine,
|
||||||
|
// and it executes on exit before the other listeners from the other sub systems
|
||||||
TaskCancellationSource = new();
|
TaskCancellationSource = new();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -48,65 +48,91 @@ public class ConsoleSubSystem : ServerSubSystem
|
|||||||
return Task.CompletedTask;
|
return Task.CompletedTask;
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task Attach(string containerId)
|
public Task Attach(string containerId)
|
||||||
{
|
{
|
||||||
Stream = await DockerClient.Containers.AttachContainerAsync(containerId,
|
|
||||||
true,
|
|
||||||
new ContainerAttachParameters()
|
|
||||||
{
|
|
||||||
Stderr = true,
|
|
||||||
Stdin = true,
|
|
||||||
Stdout = true,
|
|
||||||
Stream = true
|
|
||||||
},
|
|
||||||
Server.TaskCancellation
|
|
||||||
);
|
|
||||||
|
|
||||||
// Reading
|
// Reading
|
||||||
Task.Run(async () =>
|
Task.Run(async () =>
|
||||||
{
|
{
|
||||||
|
// This loop is here to reconnect to the container if for some reason the container
|
||||||
|
// attach stream fails before the server tasks have been canceled i.e. the before the server
|
||||||
|
// goes offline
|
||||||
|
|
||||||
while (!Server.TaskCancellation.IsCancellationRequested)
|
while (!Server.TaskCancellation.IsCancellationRequested)
|
||||||
{
|
{
|
||||||
var buffer = new byte[1024];
|
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
var readResult = await Stream.ReadOutputAsync(
|
Stream = await DockerClient.Containers.AttachContainerAsync(containerId,
|
||||||
buffer,
|
true,
|
||||||
0,
|
new ContainerAttachParameters()
|
||||||
buffer.Length,
|
{
|
||||||
|
Stderr = true,
|
||||||
|
Stdin = true,
|
||||||
|
Stdout = true,
|
||||||
|
Stream = true
|
||||||
|
},
|
||||||
Server.TaskCancellation
|
Server.TaskCancellation
|
||||||
);
|
);
|
||||||
|
|
||||||
if (readResult.EOF)
|
var buffer = new byte[1024];
|
||||||
break;
|
|
||||||
|
|
||||||
var resizedBuffer = new byte[readResult.Count];
|
try
|
||||||
Array.Copy(buffer, resizedBuffer, readResult.Count);
|
{
|
||||||
buffer = new byte[buffer.Length];
|
// Read while server tasks are not canceled
|
||||||
|
while (!Server.TaskCancellation.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
var readResult = await Stream.ReadOutputAsync(
|
||||||
|
buffer,
|
||||||
|
0,
|
||||||
|
buffer.Length,
|
||||||
|
Server.TaskCancellation
|
||||||
|
);
|
||||||
|
|
||||||
var decodedText = Encoding.UTF8.GetString(resizedBuffer);
|
if (readResult.EOF)
|
||||||
await WriteOutput(decodedText);
|
break;
|
||||||
|
|
||||||
|
var resizedBuffer = new byte[readResult.Count];
|
||||||
|
Array.Copy(buffer, resizedBuffer, readResult.Count);
|
||||||
|
buffer = new byte[buffer.Length];
|
||||||
|
|
||||||
|
var decodedText = Encoding.UTF8.GetString(resizedBuffer);
|
||||||
|
await WriteOutput(decodedText);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (TaskCanceledException)
|
||||||
|
{
|
||||||
|
// Ignored
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException)
|
||||||
|
{
|
||||||
|
// Ignored
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
Logger.LogWarning("An unhandled error occured while reading from container stream: {e}", e);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
Stream.Dispose();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
catch (TaskCanceledException)
|
catch (TaskCanceledException)
|
||||||
{
|
{
|
||||||
// Ignored
|
// ignored
|
||||||
}
|
|
||||||
catch (OperationCanceledException)
|
|
||||||
{
|
|
||||||
// Ignored
|
|
||||||
}
|
}
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
{
|
{
|
||||||
Logger.LogWarning("An unhandled error occured while reading from container stream: {e}", e);
|
Logger.LogError("An error occured while attaching to container: {e}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// Reset stream so no further inputs will be piped to it
|
// Reset stream so no further inputs will be piped to it
|
||||||
Stream = null;
|
Stream = null;
|
||||||
|
|
||||||
Logger.LogDebug("Disconnected from container stream");
|
Logger.LogDebug("Disconnected from container stream");
|
||||||
});
|
});
|
||||||
|
|
||||||
|
return Task.CompletedTask;
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task WriteOutput(string output)
|
public async Task WriteOutput(string output)
|
||||||
|
|||||||
@@ -66,7 +66,8 @@ public class ProvisionSubSystem : ServerSubSystem
|
|||||||
// 4. Ensure the docker image has been downloaded
|
// 4. Ensure the docker image has been downloaded
|
||||||
// 5. Create the docker container
|
// 5. Create the docker container
|
||||||
// 6. Attach the console
|
// 6. Attach the console
|
||||||
// 7. Start the container
|
// 7. Attach to stats
|
||||||
|
// 8. Start the container
|
||||||
|
|
||||||
// Define some shared variables:
|
// Define some shared variables:
|
||||||
var containerName = $"moonlight-runtime-{Configuration.Id}";
|
var containerName = $"moonlight-runtime-{Configuration.Id}";
|
||||||
@@ -161,7 +162,13 @@ public class ProvisionSubSystem : ServerSubSystem
|
|||||||
Logger.LogDebug("Attaching console");
|
Logger.LogDebug("Attaching console");
|
||||||
await consoleSubSystem.Attach(CurrentContainerId);
|
await consoleSubSystem.Attach(CurrentContainerId);
|
||||||
|
|
||||||
// 7. Start the docker container
|
// 7. Attach stats stream
|
||||||
|
|
||||||
|
var statsSubSystem = Server.GetRequiredSubSystem<StatsSubSystem>();
|
||||||
|
|
||||||
|
await statsSubSystem.Attach(CurrentContainerId);
|
||||||
|
|
||||||
|
// 8. Start the docker container
|
||||||
|
|
||||||
Logger.LogDebug("Starting docker container");
|
Logger.LogDebug("Starting docker container");
|
||||||
await consoleSubSystem.WriteMoonlight("Starting container");
|
await consoleSubSystem.WriteMoonlight("Starting container");
|
||||||
|
|||||||
@@ -31,6 +31,8 @@ public class RestoreSubSystem : ServerSubSystem
|
|||||||
provisionSubSystem.CurrentContainerId = runtimeContainer.ID;
|
provisionSubSystem.CurrentContainerId = runtimeContainer.ID;
|
||||||
Server.OverrideState(ServerState.Online);
|
Server.OverrideState(ServerState.Online);
|
||||||
|
|
||||||
|
// Update and attach console
|
||||||
|
|
||||||
var consoleSubSystem = Server.GetRequiredSubSystem<ConsoleSubSystem>();
|
var consoleSubSystem = Server.GetRequiredSubSystem<ConsoleSubSystem>();
|
||||||
|
|
||||||
var logStream = await DockerClient.Containers.GetContainerLogsAsync(runtimeContainerName, true, new ()
|
var logStream = await DockerClient.Containers.GetContainerLogsAsync(runtimeContainerName, true, new ()
|
||||||
@@ -53,6 +55,11 @@ public class RestoreSubSystem : ServerSubSystem
|
|||||||
|
|
||||||
await consoleSubSystem.Attach(provisionSubSystem.CurrentContainerId);
|
await consoleSubSystem.Attach(provisionSubSystem.CurrentContainerId);
|
||||||
|
|
||||||
|
// Attach stats
|
||||||
|
var statsSubSystem = Server.GetRequiredSubSystem<StatsSubSystem>();
|
||||||
|
await statsSubSystem.Attach(provisionSubSystem.CurrentContainerId);
|
||||||
|
|
||||||
|
// Done :>
|
||||||
Logger.LogInformation("Restored runtime container successfully");
|
Logger.LogInformation("Restored runtime container successfully");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,141 @@
|
|||||||
|
using Docker.DotNet;
|
||||||
|
using Docker.DotNet.Models;
|
||||||
|
using Microsoft.AspNetCore.SignalR;
|
||||||
|
using MoonlightServers.Daemon.Http.Hubs;
|
||||||
|
using MoonlightServers.DaemonShared.DaemonSide.Models;
|
||||||
|
|
||||||
|
namespace MoonlightServers.Daemon.ServerSystem.SubSystems;
|
||||||
|
|
||||||
|
public class StatsSubSystem : ServerSubSystem
|
||||||
|
{
|
||||||
|
private readonly DockerClient DockerClient;
|
||||||
|
private readonly IHubContext<ServerWebSocketHub> HubContext;
|
||||||
|
|
||||||
|
public StatsSubSystem(
|
||||||
|
Server server,
|
||||||
|
ILogger logger,
|
||||||
|
DockerClient dockerClient,
|
||||||
|
IHubContext<ServerWebSocketHub> hubContext
|
||||||
|
) : base(server, logger)
|
||||||
|
{
|
||||||
|
DockerClient = dockerClient;
|
||||||
|
HubContext = hubContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task Attach(string containerId)
|
||||||
|
{
|
||||||
|
Logger.LogDebug("Attaching to stats stream");
|
||||||
|
|
||||||
|
Task.Run(async () =>
|
||||||
|
{
|
||||||
|
while (!Server.TaskCancellation.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await DockerClient.Containers.GetContainerStatsAsync(
|
||||||
|
containerId,
|
||||||
|
new()
|
||||||
|
{
|
||||||
|
Stream = true
|
||||||
|
},
|
||||||
|
new Progress<ContainerStatsResponse>(async response =>
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var stats = ConvertToStats(response);
|
||||||
|
|
||||||
|
await HubContext.Clients
|
||||||
|
.Group(Configuration.Id.ToString())
|
||||||
|
.SendAsync("StatsUpdated", stats);
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
Logger.LogError("An error occured handling stats update: {e}", e);
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
Server.TaskCancellation
|
||||||
|
);
|
||||||
|
}
|
||||||
|
catch (TaskCanceledException)
|
||||||
|
{
|
||||||
|
// Ignored
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
Logger.LogError("An error occured while loading container stats: {e}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Logger.LogDebug("Stopped fetching container stats");
|
||||||
|
});
|
||||||
|
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ServerStats ConvertToStats(ContainerStatsResponse response)
|
||||||
|
{
|
||||||
|
var result = new ServerStats();
|
||||||
|
|
||||||
|
// When killed this field will be null so we just return
|
||||||
|
if (response.CPUStats.CPUUsage == null)
|
||||||
|
return result;
|
||||||
|
|
||||||
|
#region CPU
|
||||||
|
|
||||||
|
if(response.CPUStats is { CPUUsage.PercpuUsage: not null }) // Sometimes some values are just null >:/
|
||||||
|
{
|
||||||
|
var cpuDelta = (float)response.CPUStats.CPUUsage.TotalUsage - response.PreCPUStats.CPUUsage.TotalUsage;
|
||||||
|
var cpuSystemDelta = (float)response.CPUStats.SystemUsage - response.PreCPUStats.SystemUsage;
|
||||||
|
|
||||||
|
var cpuCoreCount = (int)response.CPUStats.OnlineCPUs;
|
||||||
|
|
||||||
|
if (cpuCoreCount == 0)
|
||||||
|
cpuCoreCount = response.CPUStats.CPUUsage.PercpuUsage.Count;
|
||||||
|
|
||||||
|
var cpuPercent = 0f;
|
||||||
|
|
||||||
|
if (cpuSystemDelta > 0.0f && cpuDelta > 0.0f)
|
||||||
|
{
|
||||||
|
cpuPercent = (cpuDelta / cpuSystemDelta) * 100;
|
||||||
|
|
||||||
|
if (cpuCoreCount > 0)
|
||||||
|
cpuPercent *= cpuCoreCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
result.CpuUsage = Math.Round(cpuPercent * 1000) / 1000;
|
||||||
|
}
|
||||||
|
|
||||||
|
#endregion
|
||||||
|
|
||||||
|
#region Memory
|
||||||
|
|
||||||
|
result.MemoryUsage = response.MemoryStats.Usage;
|
||||||
|
|
||||||
|
#endregion
|
||||||
|
|
||||||
|
#region Network
|
||||||
|
|
||||||
|
foreach (var network in response.Networks)
|
||||||
|
{
|
||||||
|
result.NetworkRead += network.Value.RxBytes;
|
||||||
|
result.NetworkWrite += network.Value.TxBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
#endregion
|
||||||
|
|
||||||
|
#region IO
|
||||||
|
|
||||||
|
if (response.BlkioStats.IoServiceBytesRecursive != null)
|
||||||
|
{
|
||||||
|
result.IoRead = response.BlkioStats.IoServiceBytesRecursive
|
||||||
|
.FirstOrDefault(x => x.Op == "read")?.Value ?? 0;
|
||||||
|
|
||||||
|
result.IoWrite = response.BlkioStats.IoServiceBytesRecursive
|
||||||
|
.FirstOrDefault(x => x.Op == "write")?.Value ?? 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
#endregion
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -85,7 +85,8 @@ public class ServerService : IHostedLifecycleService
|
|||||||
typeof(ConsoleSubSystem),
|
typeof(ConsoleSubSystem),
|
||||||
typeof(RestoreSubSystem),
|
typeof(RestoreSubSystem),
|
||||||
typeof(OnlineDetectionService),
|
typeof(OnlineDetectionService),
|
||||||
typeof(InstallationSubSystem)
|
typeof(InstallationSubSystem),
|
||||||
|
typeof(StatsSubSystem)
|
||||||
];
|
];
|
||||||
|
|
||||||
await server.Initialize(subSystems);
|
await server.Initialize(subSystems);
|
||||||
|
|||||||
@@ -0,0 +1,11 @@
|
|||||||
|
namespace MoonlightServers.DaemonShared.DaemonSide.Models;
|
||||||
|
|
||||||
|
public record ServerStats
|
||||||
|
{
|
||||||
|
public double CpuUsage { get; set; }
|
||||||
|
public ulong MemoryUsage { get; set; }
|
||||||
|
public ulong NetworkRead { get; set; }
|
||||||
|
public ulong NetworkWrite { get; set; }
|
||||||
|
public ulong IoRead { get; set; }
|
||||||
|
public ulong IoWrite { get; set; }
|
||||||
|
}
|
||||||
@@ -15,7 +15,6 @@
|
|||||||
</PropertyGroup>
|
</PropertyGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<Folder Include="DaemonSide\"/>
|
|
||||||
<Folder Include="DaemonSide\Http\Responses\Servers\Files\"/>
|
<Folder Include="DaemonSide\Http\Responses\Servers\Files\"/>
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user