Fixed usage of IAsyncObservable. Added runtime exit handler

This commit is contained in:
2025-07-29 21:14:41 +02:00
parent b546a168d2
commit f57d33cb1e
9 changed files with 85 additions and 57 deletions

View File

@@ -2,8 +2,8 @@ namespace MoonlightServers.Daemon.ServerSys.Abstractions;
public interface IConsole : IServerComponent public interface IConsole : IServerComponent
{ {
public IObservable<string> OnOutput { get; } public IAsyncObservable<string> OnOutput { get; }
public IObservable<string> OnInput { get; } public IAsyncObservable<string> OnInput { get; }
public Task AttachToRuntime(); public Task AttachToRuntime();
public Task AttachToInstallation(); public Task AttachToInstallation();

View File

@@ -2,7 +2,7 @@ namespace MoonlightServers.Daemon.ServerSys.Abstractions;
public interface IInstaller : IServerComponent public interface IInstaller : IServerComponent
{ {
public IObservable<object> OnExited { get; } public IAsyncObservable<object> OnExited { get; }
public bool IsRunning { get; } public bool IsRunning { get; }
public Task Start(); public Task Start();

View File

@@ -2,7 +2,7 @@ namespace MoonlightServers.Daemon.ServerSys.Abstractions;
public interface IProvisioner : IServerComponent public interface IProvisioner : IServerComponent
{ {
public IObservable<object> OnExited { get; } public IAsyncObservable<object> OnExited { get; }
public bool IsProvisioned { get; } public bool IsProvisioned { get; }
public Task Provision(); public Task Provision();

View File

@@ -1,4 +1,6 @@
using System.Reactive.Linq;
using System.Reactive.Subjects; using System.Reactive.Subjects;
using MoonlightServers.Daemon.Extensions;
using MoonlightServers.Daemon.ServerSystem; using MoonlightServers.Daemon.ServerSystem;
using Stateless; using Stateless;
@@ -14,13 +16,13 @@ public class Server : IAsyncDisposable
public IStatistics Statistics { get; private set; } public IStatistics Statistics { get; private set; }
public StateMachine<ServerState, ServerTrigger> StateMachine { get; private set; } public StateMachine<ServerState, ServerTrigger> StateMachine { get; private set; }
public ServerContext Context { get; private set; } public ServerContext Context { get; private set; }
public IObservable<ServerState> OnState => OnStateSubject; public IAsyncObservable<ServerState> OnState => OnStateSubject.ToAsyncObservable();
private readonly Subject<ServerState> OnStateSubject = new(); private readonly Subject<ServerState> OnStateSubject = new();
private readonly ILogger<Server> Logger; private readonly ILogger<Server> Logger;
private IDisposable? ProvisionExitSubscription; private IAsyncDisposable? ProvisionExitSubscription;
private IDisposable? InstallerExitSubscription; private IAsyncDisposable? InstallerExitSubscription;
public Server( public Server(
ILogger<Server> logger, ILogger<Server> logger,
@@ -78,14 +80,14 @@ public class Server : IAsyncDisposable
CreateStateMachine(restoredState); CreateStateMachine(restoredState);
// Setup event handling // Setup event handling
ProvisionExitSubscription = Provisioner.OnExited.Subscribe(o => ProvisionExitSubscription = await Provisioner.OnExited.SubscribeAsync(async o =>
{ {
StateMachine.Fire(ServerTrigger.Exited); await StateMachine.FireAsync(ServerTrigger.Exited);
}); });
InstallerExitSubscription = Installer.OnExited.Subscribe(o => InstallerExitSubscription = await Installer.OnExited.SubscribeAsync(async o =>
{ {
StateMachine.Fire(ServerTrigger.Exited); await StateMachine.FireAsync(ServerTrigger.Exited);
}); });
} }
@@ -126,10 +128,15 @@ public class Server : IAsyncDisposable
// Handle transitions // Handle transitions
StateMachine.Configure(ServerState.Starting) StateMachine.Configure(ServerState.Starting)
.OnEntryAsync(HandleStart); .OnEntryAsync(HandleStart)
.OnExitFromAsync(ServerTrigger.Exited, HandleRuntimeExit);
StateMachine.Configure(ServerState.Online)
.OnExitFromAsync(ServerTrigger.Exited, HandleRuntimeExit);
StateMachine.Configure(ServerState.Stopping) StateMachine.Configure(ServerState.Stopping)
.OnEntryFromAsync(ServerTrigger.Stop, HandleStop); .OnEntryFromAsync(ServerTrigger.Stop, HandleStop)
.OnExitFromAsync(ServerTrigger.Exited, HandleRuntimeExit);
} }
#region State machine handlers #region State machine handlers
@@ -184,15 +191,23 @@ public class Server : IAsyncDisposable
await Provisioner.Stop(); await Provisioner.Stop();
} }
private async Task HandleRuntimeExit()
{
Logger.LogDebug("Deprovisioning");
await Console.WriteToMoonlight("Deprovisioning");
await Provisioner.Deprovision();
}
#endregion #endregion
public async ValueTask DisposeAsync() public async ValueTask DisposeAsync()
{ {
if (ProvisionExitSubscription != null) if (ProvisionExitSubscription != null)
ProvisionExitSubscription.Dispose(); await ProvisionExitSubscription.DisposeAsync();
if (InstallerExitSubscription != null) if (InstallerExitSubscription != null)
InstallerExitSubscription.Dispose(); await InstallerExitSubscription.DisposeAsync();
await Console.DisposeAsync(); await Console.DisposeAsync();
await FileSystem.DisposeAsync(); await FileSystem.DisposeAsync();

View File

@@ -3,7 +3,6 @@ using System.Reactive.Subjects;
using System.Text; using System.Text;
using Docker.DotNet; using Docker.DotNet;
using Docker.DotNet.Models; using Docker.DotNet.Models;
using Microsoft.Extensions.Options;
using MoonCore.Helpers; using MoonCore.Helpers;
using MoonlightServers.Daemon.ServerSys.Abstractions; using MoonlightServers.Daemon.ServerSys.Abstractions;
@@ -11,8 +10,8 @@ namespace MoonlightServers.Daemon.ServerSys.Implementations;
public class DockerConsole : IConsole public class DockerConsole : IConsole
{ {
public IObservable<string> OnOutput => OnOutputSubject; public IAsyncObservable<string> OnOutput => OnOutputSubject.ToAsyncObservable();
public IObservable<string> OnInput => OnInputSubject; public IAsyncObservable<string> OnInput => OnInputSubject.ToAsyncObservable();
private readonly Subject<string> OnOutputSubject = new(); private readonly Subject<string> OnOutputSubject = new();
private readonly Subject<string> OnInputSubject = new(); private readonly Subject<string> OnInputSubject = new();
@@ -146,13 +145,7 @@ public class DockerConsole : IConsole
OutputCache.Add(content); OutputCache.Add(content);
if (OutputCache.Count > 250) // TODO: Config if (OutputCache.Count > 250) // TODO: Config
{ OutputCache.RemoveRange(0, 100);
// TODO: Replace with remove range once it becomes available in mooncore
for (var i = 0; i < 100; i++)
{
OutputCache.RemoveAt(i);
}
}
OnOutputSubject.OnNext(content); OnOutputSubject.OnNext(content);
return Task.CompletedTask; return Task.CompletedTask;

View File

@@ -1,21 +1,30 @@
using System.Reactive.Linq; using System.Reactive.Linq;
using System.Reactive.Subjects; using System.Reactive.Subjects;
using MoonlightServers.Daemon.ServerSys.Abstractions; using MoonlightServers.Daemon.ServerSys.Abstractions;
using MoonlightServers.Daemon.Services;
namespace MoonlightServers.Daemon.ServerSys.Implementations; namespace MoonlightServers.Daemon.ServerSys.Implementations;
public class DockerInstaller : IInstaller public class DockerInstaller : IInstaller
{ {
public IObservable<object> OnExited => OnExitedSubject; public IAsyncObservable<object> OnExited => OnExitedSubject.ToAsyncObservable();
public bool IsRunning { get; private set; } = false; public bool IsRunning { get; private set; } = false;
private readonly Subject<string> OnExitedSubject = new();
private readonly ILogger<DockerInstaller> Logger;
public DockerInstaller(ILogger<DockerInstaller> logger) private readonly Subject<string> OnExitedSubject = new();
private readonly ILogger<DockerInstaller> Logger;
private readonly DockerEventService EventService;
private string? ContainerId;
private string? ContainerName;
public DockerInstaller(
ILogger<DockerInstaller> logger,
DockerEventService eventService
)
{ {
Logger = logger; Logger = logger;
EventService = eventService;
} }
public Task Initialize() public Task Initialize()
@@ -27,7 +36,7 @@ public class DockerInstaller : IInstaller
{ {
throw new NotImplementedException(); throw new NotImplementedException();
} }
public Task Start() public Task Start()
{ {
throw new NotImplementedException(); throw new NotImplementedException();
@@ -47,7 +56,7 @@ public class DockerInstaller : IInstaller
{ {
throw new NotImplementedException(); throw new NotImplementedException();
} }
public async ValueTask DisposeAsync() public async ValueTask DisposeAsync()
{ {
OnExitedSubject.Dispose(); OnExitedSubject.Dispose();

View File

@@ -11,7 +11,7 @@ namespace MoonlightServers.Daemon.ServerSys.Implementations;
public class DockerProvisioner : IProvisioner public class DockerProvisioner : IProvisioner
{ {
public IObservable<object> OnExited => OnExitedSubject; public IAsyncObservable<object> OnExited => OnExitedSubject.ToAsyncObservable();
public bool IsProvisioned { get; private set; } public bool IsProvisioned { get; private set; }
private readonly DockerClient DockerClient; private readonly DockerClient DockerClient;
@@ -27,7 +27,7 @@ public class DockerProvisioner : IProvisioner
private string? ContainerId; private string? ContainerId;
private string ContainerName; private string ContainerName;
private IDisposable? ContainerEventSubscription; private IAsyncDisposable? ContainerEventSubscription;
public DockerProvisioner( public DockerProvisioner(
DockerClient dockerClient, DockerClient dockerClient,
@@ -54,9 +54,9 @@ public class DockerProvisioner : IProvisioner
{ {
ContainerName = $"moonlight-runtime-{Context.Configuration.Id}"; ContainerName = $"moonlight-runtime-{Context.Configuration.Id}";
ContainerEventSubscription = EventService ContainerEventSubscription = await EventService
.OnContainerEvent .OnContainerEvent
.Subscribe(HandleContainerEvent); .SubscribeAsync(HandleContainerEvent);
// Check for any already existing runtime container to reclaim // Check for any already existing runtime container to reclaim
Logger.LogDebug("Searching for orphan container to reclaim"); Logger.LogDebug("Searching for orphan container to reclaim");
@@ -74,17 +74,19 @@ public class DockerProvisioner : IProvisioner
} }
} }
private void HandleContainerEvent(Message message) private ValueTask HandleContainerEvent(Message message)
{ {
// Only handle events for our own container // Only handle events for our own container
if (message.ID != ContainerId) if (message.ID != ContainerId)
return; return ValueTask.CompletedTask;
// Only handle die events // Only handle die events
if (message.Action != "die") if (message.Action != "die")
return; return ValueTask.CompletedTask;
OnExitedSubject.OnNext(message); OnExitedSubject.OnNext(message);
return ValueTask.CompletedTask;
} }
public Task Sync() public Task Sync()
@@ -253,6 +255,6 @@ public class DockerProvisioner : IProvisioner
OnExitedSubject.Dispose(); OnExitedSubject.Dispose();
if (ContainerEventSubscription != null) if (ContainerEventSubscription != null)
ContainerEventSubscription.Dispose(); await ContainerEventSubscription.DisposeAsync();
} }
} }

View File

@@ -1,3 +1,4 @@
using System.Reactive.Concurrency;
using System.Reactive.Linq; using System.Reactive.Linq;
using System.Reactive.Subjects; using System.Reactive.Subjects;
using Docker.DotNet; using Docker.DotNet;
@@ -10,9 +11,9 @@ public class DockerEventService : BackgroundService
private readonly ILogger<DockerEventService> Logger; private readonly ILogger<DockerEventService> Logger;
private readonly DockerClient DockerClient; private readonly DockerClient DockerClient;
public IObservable<Message> OnContainerEvent => OnContainerSubject; public IAsyncObservable<Message> OnContainerEvent => OnContainerSubject.ToAsyncObservable().ObserveOn(TaskPoolAsyncScheduler.Default);
public IObservable<Message> OnImageEvent => OnImageSubject; public IAsyncObservable<Message> OnImageEvent => OnImageSubject.ToAsyncObservable().ObserveOn(TaskPoolAsyncScheduler.Default);
public IObservable<Message> OnNetworkEvent => OnNetworkSubject; public IAsyncObservable<Message> OnNetworkEvent => OnNetworkSubject.ToAsyncObservable().ObserveOn(TaskPoolAsyncScheduler.Default);
private readonly Subject<Message> OnContainerSubject = new(); private readonly Subject<Message> OnContainerSubject = new();
private readonly Subject<Message> OnImageSubject = new(); private readonly Subject<Message> OnImageSubject = new();
@@ -39,19 +40,26 @@ public class DockerEventService : BackgroundService
new ContainerEventsParameters(), new ContainerEventsParameters(),
new Progress<Message>(message => new Progress<Message>(message =>
{ {
switch (message.Type) try
{ {
case "container": switch (message.Type)
OnContainerSubject.OnNext(message); {
break; case "container":
OnContainerSubject.OnNext(message);
break;
case "image": case "image":
OnImageSubject.OnNext(message); OnImageSubject.OnNext(message);
break; break;
case "network": case "network":
OnNetworkSubject.OnNext(message); OnNetworkSubject.OnNext(message);
break; break;
}
}
catch (Exception e)
{
Logger.LogError(e, "An error occured while processing docker event");
} }
}), }),
stoppingToken stoppingToken

View File

@@ -107,12 +107,13 @@ public class Startup
var factory = WebApplication.Services.GetRequiredService<ServerFactory>(); var factory = WebApplication.Services.GetRequiredService<ServerFactory>();
var server = factory.CreateServer(config); var server = factory.CreateServer(config);
using var consoleSub = server.Console.OnOutput await using var consoleSub = await server.Console.OnOutput
.Subscribe(Console.Write); .SubscribeAsync(Console.Write);
using var stateSub = server.OnState.Subscribe(state => await using var stateSub = await server.OnState.SubscribeAsync(state =>
{ {
Console.WriteLine($"State: {state}"); Console.WriteLine($"State: {state}");
return ValueTask.CompletedTask;
}); });
await server.Initialize(); await server.Initialize();