From eaf8c36f7f18ed7410318e46117d67b2eadd41d0 Mon Sep 17 00:00:00 2001 From: ChiaraBm Date: Wed, 30 Jul 2025 17:12:21 +0200 Subject: [PATCH] Fixed event/observer issues --- .../MoonlightServers.Daemon.csproj | 2 +- .../ServerSys/Abstractions/Server.cs | 103 ++++++++++-------- .../Implementations/DockerConsole.cs | 17 +-- .../Implementations/DockerInstaller.cs | 19 ++-- .../Implementations/DockerProvisioner.cs | 19 ++-- .../Implementations/DockerStatistics.cs | 8 +- .../Services/DockerEventService.cs | 22 ++-- MoonlightServers.Daemon/Startup.cs | 10 +- 8 files changed, 113 insertions(+), 87 deletions(-) diff --git a/MoonlightServers.Daemon/MoonlightServers.Daemon.csproj b/MoonlightServers.Daemon/MoonlightServers.Daemon.csproj index 7e733f5..089b3d8 100644 --- a/MoonlightServers.Daemon/MoonlightServers.Daemon.csproj +++ b/MoonlightServers.Daemon/MoonlightServers.Daemon.csproj @@ -9,7 +9,7 @@ - + diff --git a/MoonlightServers.Daemon/ServerSys/Abstractions/Server.cs b/MoonlightServers.Daemon/ServerSys/Abstractions/Server.cs index 7da4052..34d9d54 100644 --- a/MoonlightServers.Daemon/ServerSys/Abstractions/Server.cs +++ b/MoonlightServers.Daemon/ServerSys/Abstractions/Server.cs @@ -1,6 +1,6 @@ -using System.Reactive.Linq; -using System.Reactive.Subjects; +using MoonCore.Observability; using MoonlightServers.Daemon.Extensions; +using MoonlightServers.Daemon.Helpers; using MoonlightServers.Daemon.ServerSystem; using Stateless; @@ -8,19 +8,19 @@ namespace MoonlightServers.Daemon.ServerSys.Abstractions; public class Server : IAsyncDisposable { - public IConsole Console { get; private set; } - public IFileSystem FileSystem { get; private set; } - public IInstaller Installer { get; private set; } - public IProvisioner Provisioner { get; private set; } - public IRestorer Restorer { get; private set; } - public IStatistics Statistics { get; private set; } + public IConsole Console { get; } + public IFileSystem FileSystem { get; } + public IInstaller Installer { get; } + public IProvisioner Provisioner { get; } + public IRestorer Restorer { get; } + public IStatistics Statistics { get; } public StateMachine StateMachine { get; private set; } - public ServerContext Context { get; private set; } - public IAsyncObservable OnState => OnStateSubject.ToAsyncObservable(); + public ServerContext Context { get; } + public IAsyncObservable OnState => OnStateSubject; - private readonly Subject OnStateSubject = new(); + private readonly EventSubject OnStateSubject = new(); private readonly ILogger Logger; - + private IAsyncDisposable? ProvisionExitSubscription; private IAsyncDisposable? InstallerExitSubscription; @@ -80,22 +80,22 @@ public class Server : IAsyncDisposable CreateStateMachine(restoredState); // Setup event handling - ProvisionExitSubscription = await Provisioner.OnExited.SubscribeAsync(async o => - { - await StateMachine.FireAsync(ServerTrigger.Exited); - }); + ProvisionExitSubscription = await Provisioner.OnExited.SubscribeEventAsync(async _ => + await StateMachine.FireAsync(ServerTrigger.Exited) + ); - InstallerExitSubscription = await Installer.OnExited.SubscribeAsync(async o => - { - await StateMachine.FireAsync(ServerTrigger.Exited); - }); + InstallerExitSubscription = await Installer.OnExited.SubscribeEventAsync(async _ => + await StateMachine.FireAsync(ServerTrigger.Exited) + ); } private void CreateStateMachine(ServerState initialState) { StateMachine = new StateMachine(initialState, FiringMode.Queued); - - StateMachine.OnTransitioned(transition => OnStateSubject.OnNext(transition.Destination)); + + StateMachine.OnTransitionedAsync(async transition + => await OnStateSubject.OnNextAsync(transition.Destination) + ); // Configure basic state machine flow @@ -134,7 +134,7 @@ public class Server : IAsyncDisposable StateMachine.Configure(ServerState.Installing) .OnEntryAsync(HandleInstall) .OnExitFromAsync(ServerTrigger.Exited, HandleInstallExit); - + StateMachine.Configure(ServerState.Online) .OnExitFromAsync(ServerTrigger.Exited, HandleRuntimeExit); @@ -187,9 +187,12 @@ public class Server : IAsyncDisposable catch (Exception e) { Logger.LogError(e, "An error occured while starting the server"); + await Console.WriteToMoonlight("Failed to start the server. More details can be found in the daemon logs"); + + await StateMachine.FireAsync(ServerTrigger.FailSafe); } } - + private async Task HandleStop() { await Provisioner.Stop(); @@ -199,41 +202,51 @@ public class Server : IAsyncDisposable { Logger.LogDebug("Deprovisioning"); await Console.WriteToMoonlight("Deprovisioning"); - + await Provisioner.Deprovision(); } private async Task HandleInstall() { - Logger.LogDebug("Installing"); - - Logger.LogDebug("Setting up"); - await Console.WriteToMoonlight("Setting up installation"); - - // TODO: Extract to service - - Context.InstallConfiguration = new() + try { - Shell = "/bin/ash", - DockerImage = "ghcr.io/parkervcp/installers:alpine", - Script = - "#!/bin/ash\n# Paper Installation Script\n#\n# Server Files: /mnt/server\nPROJECT=paper\n\nif [ -n \"${DL_PATH}\" ]; then\n\techo -e \"Using supplied download url: ${DL_PATH}\"\n\tDOWNLOAD_URL=`eval echo $(echo ${DL_PATH} | sed -e 's/{{/${/g' -e 's/}}/}/g')`\nelse\n\tVER_EXISTS=`curl -s https://api.papermc.io/v2/projects/${PROJECT} | jq -r --arg VERSION $MINECRAFT_VERSION '.versions[] | contains($VERSION)' | grep -m1 true`\n\tLATEST_VERSION=`curl -s https://api.papermc.io/v2/projects/${PROJECT} | jq -r '.versions' | jq -r '.[-1]'`\n\n\tif [ \"${VER_EXISTS}\" == \"true\" ]; then\n\t\techo -e \"Version is valid. Using version ${MINECRAFT_VERSION}\"\n\telse\n\t\techo -e \"Specified version not found. Defaulting to the latest ${PROJECT} version\"\n\t\tMINECRAFT_VERSION=${LATEST_VERSION}\n\tfi\n\n\tBUILD_EXISTS=`curl -s https://api.papermc.io/v2/projects/${PROJECT}/versions/${MINECRAFT_VERSION} | jq -r --arg BUILD ${BUILD_NUMBER} '.builds[] | tostring | contains($BUILD)' | grep -m1 true`\n\tLATEST_BUILD=`curl -s https://api.papermc.io/v2/projects/${PROJECT}/versions/${MINECRAFT_VERSION} | jq -r '.builds' | jq -r '.[-1]'`\n\n\tif [ \"${BUILD_EXISTS}\" == \"true\" ]; then\n\t\techo -e \"Build is valid for version ${MINECRAFT_VERSION}. Using build ${BUILD_NUMBER}\"\n\telse\n\t\techo -e \"Using the latest ${PROJECT} build for version ${MINECRAFT_VERSION}\"\n\t\tBUILD_NUMBER=${LATEST_BUILD}\n\tfi\n\n\tJAR_NAME=${PROJECT}-${MINECRAFT_VERSION}-${BUILD_NUMBER}.jar\n\n\techo \"Version being downloaded\"\n\techo -e \"MC Version: ${MINECRAFT_VERSION}\"\n\techo -e \"Build: ${BUILD_NUMBER}\"\n\techo -e \"JAR Name of Build: ${JAR_NAME}\"\n\tDOWNLOAD_URL=https://api.papermc.io/v2/projects/${PROJECT}/versions/${MINECRAFT_VERSION}/builds/${BUILD_NUMBER}/downloads/${JAR_NAME}\nfi\n\ncd /mnt/server\n\necho -e \"Running curl -o ${SERVER_JARFILE} ${DOWNLOAD_URL}\"\n\nif [ -f ${SERVER_JARFILE} ]; then\n\tmv ${SERVER_JARFILE} ${SERVER_JARFILE}.old\nfi\n\ncurl -o ${SERVER_JARFILE} ${DOWNLOAD_URL}\n\nif [ ! -f server.properties ]; then\n echo -e \"Downloading MC server.properties\"\n curl -o server.properties https://raw.githubusercontent.com/parkervcp/eggs/master/minecraft/java/server.properties\nfi" - }; - - await Installer.Setup(); + Logger.LogDebug("Installing"); - await Console.AttachToInstallation(); + Logger.LogDebug("Setting up"); + await Console.WriteToMoonlight("Setting up installation"); - await Installer.Start(); + // TODO: Extract to service + + Context.InstallConfiguration = new() + { + Shell = "/bin/ash", + DockerImage = "ghcr.io/parkervcp/installers:alpine", + Script = + "#!/bin/ash\n# Paper Installation Script\n#\n# Server Files: /mnt/server\nPROJECT=paper\n\nif [ -n \"${DL_PATH}\" ]; then\n\techo -e \"Using supplied download url: ${DL_PATH}\"\n\tDOWNLOAD_URL=`eval echo $(echo ${DL_PATH} | sed -e 's/{{/${/g' -e 's/}}/}/g')`\nelse\n\tVER_EXISTS=`curl -s https://api.papermc.io/v2/projects/${PROJECT} | jq -r --arg VERSION $MINECRAFT_VERSION '.versions[] | contains($VERSION)' | grep -m1 true`\n\tLATEST_VERSION=`curl -s https://api.papermc.io/v2/projects/${PROJECT} | jq -r '.versions' | jq -r '.[-1]'`\n\n\tif [ \"${VER_EXISTS}\" == \"true\" ]; then\n\t\techo -e \"Version is valid. Using version ${MINECRAFT_VERSION}\"\n\telse\n\t\techo -e \"Specified version not found. Defaulting to the latest ${PROJECT} version\"\n\t\tMINECRAFT_VERSION=${LATEST_VERSION}\n\tfi\n\n\tBUILD_EXISTS=`curl -s https://api.papermc.io/v2/projects/${PROJECT}/versions/${MINECRAFT_VERSION} | jq -r --arg BUILD ${BUILD_NUMBER} '.builds[] | tostring | contains($BUILD)' | grep -m1 true`\n\tLATEST_BUILD=`curl -s https://api.papermc.io/v2/projects/${PROJECT}/versions/${MINECRAFT_VERSION} | jq -r '.builds' | jq -r '.[-1]'`\n\n\tif [ \"${BUILD_EXISTS}\" == \"true\" ]; then\n\t\techo -e \"Build is valid for version ${MINECRAFT_VERSION}. Using build ${BUILD_NUMBER}\"\n\telse\n\t\techo -e \"Using the latest ${PROJECT} build for version ${MINECRAFT_VERSION}\"\n\t\tBUILD_NUMBER=${LATEST_BUILD}\n\tfi\n\n\tJAR_NAME=${PROJECT}-${MINECRAFT_VERSION}-${BUILD_NUMBER}.jar\n\n\techo \"Version being downloaded\"\n\techo -e \"MC Version: ${MINECRAFT_VERSION}\"\n\techo -e \"Build: ${BUILD_NUMBER}\"\n\techo -e \"JAR Name of Build: ${JAR_NAME}\"\n\tDOWNLOAD_URL=https://api.papermc.io/v2/projects/${PROJECT}/versions/${MINECRAFT_VERSION}/builds/${BUILD_NUMBER}/downloads/${JAR_NAME}\nfi\n\ncd /mnt/server\n\necho -e \"Running curl -o ${SERVER_JARFILE} ${DOWNLOAD_URL}\"\n\nif [ -f ${SERVER_JARFILE} ]; then\n\tmv ${SERVER_JARFILE} ${SERVER_JARFILE}.old\nfi\n\ncurl -o ${SERVER_JARFILE} ${DOWNLOAD_URL}\n\nif [ ! -f server.properties ]; then\n echo -e \"Downloading MC server.properties\"\n curl -o server.properties https://raw.githubusercontent.com/parkervcp/eggs/master/minecraft/java/server.properties\nfi" + }; + + await Installer.Setup(); + + await Console.AttachToInstallation(); + + await Installer.Start(); + } + catch (Exception e) + { + Logger.LogError(e, "An error occured while starting installation"); + await Console.WriteToMoonlight("An error occured while starting installation"); + + await StateMachine.FireAsync(ServerTrigger.FailSafe); + } } - + private async Task HandleInstallExit() { Logger.LogDebug("Installation done"); await Console.WriteToMoonlight("Cleaning up"); - + await Installer.Cleanup(); - + await Console.WriteToMoonlight("Installation completed"); } diff --git a/MoonlightServers.Daemon/ServerSys/Implementations/DockerConsole.cs b/MoonlightServers.Daemon/ServerSys/Implementations/DockerConsole.cs index e87b8ce..8d8663f 100644 --- a/MoonlightServers.Daemon/ServerSys/Implementations/DockerConsole.cs +++ b/MoonlightServers.Daemon/ServerSys/Implementations/DockerConsole.cs @@ -4,17 +4,19 @@ using System.Text; using Docker.DotNet; using Docker.DotNet.Models; using MoonCore.Helpers; +using MoonCore.Observability; +using MoonlightServers.Daemon.Helpers; using MoonlightServers.Daemon.ServerSys.Abstractions; namespace MoonlightServers.Daemon.ServerSys.Implementations; public class DockerConsole : IConsole { - public IAsyncObservable OnOutput => OnOutputSubject.ToAsyncObservable(); - public IAsyncObservable OnInput => OnInputSubject.ToAsyncObservable(); + public IAsyncObservable OnOutput => OnOutputSubject; + public IAsyncObservable OnInput => OnInputSubject; - private readonly Subject OnOutputSubject = new(); - private readonly Subject OnInputSubject = new(); + private readonly EventSubject OnOutputSubject = new(); + private readonly EventSubject OnInputSubject = new(); private readonly ConcurrentList OutputCache = new(); private readonly DockerClient DockerClient; @@ -140,15 +142,14 @@ public class DockerConsole : IConsole return Task.CompletedTask; } - public Task WriteToOutput(string content) + public async Task WriteToOutput(string content) { OutputCache.Add(content); if (OutputCache.Count > 250) // TODO: Config OutputCache.RemoveRange(0, 100); - OnOutputSubject.OnNext(content); - return Task.CompletedTask; + await OnOutputSubject.OnNextAsync(content); } public async Task WriteToInput(string content) @@ -164,6 +165,8 @@ public class DockerConsole : IConsole contentBuffer.Length, Cts.Token ); + + await OnInputSubject.OnNextAsync(content); } public async Task WriteToMoonlight(string content) diff --git a/MoonlightServers.Daemon/ServerSys/Implementations/DockerInstaller.cs b/MoonlightServers.Daemon/ServerSys/Implementations/DockerInstaller.cs index b2a00d9..1a29bb9 100644 --- a/MoonlightServers.Daemon/ServerSys/Implementations/DockerInstaller.cs +++ b/MoonlightServers.Daemon/ServerSys/Implementations/DockerInstaller.cs @@ -2,7 +2,10 @@ using System.Reactive.Linq; using System.Reactive.Subjects; using Docker.DotNet; using Docker.DotNet.Models; +using MoonCore.Observability; using MoonlightServers.Daemon.Configuration; +using MoonlightServers.Daemon.Extensions; +using MoonlightServers.Daemon.Helpers; using MoonlightServers.Daemon.Mappers; using MoonlightServers.Daemon.ServerSys.Abstractions; using MoonlightServers.Daemon.Services; @@ -11,10 +14,10 @@ namespace MoonlightServers.Daemon.ServerSys.Implementations; public class DockerInstaller : IInstaller { - public IAsyncObservable OnExited => OnExitedSubject.ToAsyncObservable(); + public IAsyncObservable OnExited => OnExitedSubject; public bool IsRunning { get; private set; } = false; - private readonly Subject OnExitedSubject = new(); + private readonly EventSubject OnExitedSubject = new(); private readonly ILogger Logger; private readonly DockerEventService EventService; @@ -64,7 +67,7 @@ public class DockerInstaller : IInstaller ContainerEventSubscription = await EventService .OnContainerEvent - .SubscribeAsync(HandleContainerEvent); + .SubscribeEventAsync(HandleContainerEvent); // Check for any already existing runtime container to reclaim Logger.LogDebug("Searching for orphan container to reclaim"); @@ -82,19 +85,17 @@ public class DockerInstaller : IInstaller } } - private ValueTask HandleContainerEvent(Message message) + private async ValueTask HandleContainerEvent(Message message) { // Only handle events for our own container if (message.ID != ContainerId) - return ValueTask.CompletedTask; + return; // Only handle die events if (message.Action != "die") - return ValueTask.CompletedTask; + return; - OnExitedSubject.OnNext(message); - - return ValueTask.CompletedTask; + await OnExitedSubject.OnNextAsync(message); } public Task Sync() diff --git a/MoonlightServers.Daemon/ServerSys/Implementations/DockerProvisioner.cs b/MoonlightServers.Daemon/ServerSys/Implementations/DockerProvisioner.cs index c03dd2a..6da65ff 100644 --- a/MoonlightServers.Daemon/ServerSys/Implementations/DockerProvisioner.cs +++ b/MoonlightServers.Daemon/ServerSys/Implementations/DockerProvisioner.cs @@ -3,6 +3,9 @@ using System.Reactive.Linq; using System.Reactive.Subjects; using Docker.DotNet; using Docker.DotNet.Models; +using MoonCore.Observability; +using MoonlightServers.Daemon.Extensions; +using MoonlightServers.Daemon.Helpers; using MoonlightServers.Daemon.Mappers; using MoonlightServers.Daemon.ServerSys.Abstractions; using MoonlightServers.Daemon.Services; @@ -11,7 +14,7 @@ namespace MoonlightServers.Daemon.ServerSys.Implementations; public class DockerProvisioner : IProvisioner { - public IAsyncObservable OnExited => OnExitedSubject.ToAsyncObservable(); + public IAsyncObservable OnExited => OnExitedSubject; public bool IsProvisioned { get; private set; } private readonly DockerClient DockerClient; @@ -23,7 +26,7 @@ public class DockerProvisioner : IProvisioner private readonly ServerConfigurationMapper Mapper; private readonly IFileSystem FileSystem; - private Subject OnExitedSubject = new(); + private EventSubject OnExitedSubject = new(); private string? ContainerId; private string ContainerName; @@ -56,7 +59,7 @@ public class DockerProvisioner : IProvisioner ContainerEventSubscription = await EventService .OnContainerEvent - .SubscribeAsync(HandleContainerEvent); + .SubscribeEventAsync(HandleContainerEvent); // Check for any already existing runtime container to reclaim Logger.LogDebug("Searching for orphan container to reclaim"); @@ -74,19 +77,17 @@ public class DockerProvisioner : IProvisioner } } - private ValueTask HandleContainerEvent(Message message) + private async ValueTask HandleContainerEvent(Message message) { // Only handle events for our own container if (message.ID != ContainerId) - return ValueTask.CompletedTask; + return; // Only handle die events if (message.Action != "die") - return ValueTask.CompletedTask; + return; - OnExitedSubject.OnNext(message); - - return ValueTask.CompletedTask; + await OnExitedSubject.OnNextAsync(message); } public Task Sync() diff --git a/MoonlightServers.Daemon/ServerSys/Implementations/DockerStatistics.cs b/MoonlightServers.Daemon/ServerSys/Implementations/DockerStatistics.cs index c127048..23a9617 100644 --- a/MoonlightServers.Daemon/ServerSys/Implementations/DockerStatistics.cs +++ b/MoonlightServers.Daemon/ServerSys/Implementations/DockerStatistics.cs @@ -1,14 +1,14 @@ -using System.Reactive.Linq; -using System.Reactive.Subjects; +using MoonCore.Observability; +using MoonlightServers.Daemon.Helpers; using MoonlightServers.Daemon.ServerSys.Abstractions; namespace MoonlightServers.Daemon.ServerSys.Implementations; public class DockerStatistics : IStatistics { - public IAsyncObservable OnStats => OnStatsSubject.ToAsyncObservable(); + public IAsyncObservable OnStats => OnStatsSubject; - private readonly Subject OnStatsSubject = new(); + private readonly EventSubject OnStatsSubject = new(); public Task Initialize() => Task.CompletedTask; diff --git a/MoonlightServers.Daemon/Services/DockerEventService.cs b/MoonlightServers.Daemon/Services/DockerEventService.cs index f1f134c..4bb9f88 100644 --- a/MoonlightServers.Daemon/Services/DockerEventService.cs +++ b/MoonlightServers.Daemon/Services/DockerEventService.cs @@ -3,6 +3,8 @@ using System.Reactive.Linq; using System.Reactive.Subjects; using Docker.DotNet; using Docker.DotNet.Models; +using MoonCore.Observability; +using MoonlightServers.Daemon.Helpers; namespace MoonlightServers.Daemon.Services; @@ -11,13 +13,13 @@ public class DockerEventService : BackgroundService private readonly ILogger Logger; private readonly DockerClient DockerClient; - public IAsyncObservable OnContainerEvent => OnContainerSubject.ToAsyncObservable().ObserveOn(TaskPoolAsyncScheduler.Default); - public IAsyncObservable OnImageEvent => OnImageSubject.ToAsyncObservable().ObserveOn(TaskPoolAsyncScheduler.Default); - public IAsyncObservable OnNetworkEvent => OnNetworkSubject.ToAsyncObservable().ObserveOn(TaskPoolAsyncScheduler.Default); + public IAsyncObservable OnContainerEvent => OnContainerSubject; + public IAsyncObservable OnImageEvent => OnImageSubject; + public IAsyncObservable OnNetworkEvent => OnNetworkSubject; - private readonly Subject OnContainerSubject = new(); - private readonly Subject OnImageSubject = new(); - private readonly Subject OnNetworkSubject = new(); + private readonly EventSubject OnContainerSubject = new(); + private readonly EventSubject OnImageSubject = new(); + private readonly EventSubject OnNetworkSubject = new(); public DockerEventService( ILogger logger, @@ -38,22 +40,22 @@ public class DockerEventService : BackgroundService { await DockerClient.System.MonitorEventsAsync( new ContainerEventsParameters(), - new Progress(message => + new Progress(async message => { try { switch (message.Type) { case "container": - OnContainerSubject.OnNext(message); + await OnContainerSubject.OnNextAsync(message); break; case "image": - OnImageSubject.OnNext(message); + await OnImageSubject.OnNextAsync(message); break; case "network": - OnNetworkSubject.OnNext(message); + await OnNetworkSubject.OnNextAsync(message); break; } } diff --git a/MoonlightServers.Daemon/Startup.cs b/MoonlightServers.Daemon/Startup.cs index b85eaf3..3fdb2c1 100644 --- a/MoonlightServers.Daemon/Startup.cs +++ b/MoonlightServers.Daemon/Startup.cs @@ -11,7 +11,9 @@ using MoonCore.Extended.Extensions; using MoonCore.Extensions; using MoonCore.Helpers; using MoonCore.Logging; +using MoonCore.Observability; using MoonlightServers.Daemon.Configuration; +using MoonlightServers.Daemon.Extensions; using MoonlightServers.Daemon.Helpers; using MoonlightServers.Daemon.Http.Hubs; using MoonlightServers.Daemon.Mappers; @@ -108,9 +110,13 @@ public class Startup var server = factory.CreateServer(config); await using var consoleSub = await server.Console.OnOutput - .SubscribeAsync(Console.Write); + .SubscribeEventAsync(line => + { + Console.Write(line); + return ValueTask.CompletedTask; + }); - await using var stateSub = await server.OnState.SubscribeAsync(state => + await using var stateSub = await server.OnState.SubscribeEventAsync(state => { Console.WriteLine($"State: {state}"); return ValueTask.CompletedTask;