using System.Buffers; using System.Text; using Docker.DotNet; using Docker.DotNet.Models; using MoonlightServers.Daemon.ServerSystem.Abstractions; namespace MoonlightServers.Daemon.ServerSystem.Implementations.Docker; public class DockerConsole : IRuntimeConsole, IInstallConsole, IAsyncDisposable { public event Func? OnOutput; private MultiplexedStream? Stream; private readonly string ContainerId; private readonly DockerClient DockerClient; private readonly ILogger Logger; private readonly List Cache = new(302); private readonly SemaphoreSlim CacheLock = new(1, 1); private readonly CancellationTokenSource Cts = new(); public DockerConsole( string containerId, DockerClient dockerClient, ILogger logger ) { ContainerId = containerId; DockerClient = dockerClient; Logger = logger; } public async Task AttachAsync() { // Fetch initial logs Logger.LogTrace("Fetching pre-existing logs from container"); var logResponse = await DockerClient.Containers.GetContainerLogsAsync( ContainerId, new() { Follow = false, ShowStderr = true, ShowStdout = true } ); // Append to cache var logs = await logResponse.ReadOutputToEndAsync(Cts.Token); await CacheLock.WaitAsync(Cts.Token); try { Cache.Add(logs.stdout); Cache.Add(logs.stderr); } finally { CacheLock.Release(); } // Stream new logs Logger.LogTrace("Starting log streaming"); Task.Run(async () => { var capturedCt = Cts.Token; Logger.LogTrace("Starting attach loop"); while (!capturedCt.IsCancellationRequested) { try { using var stream = await DockerClient.Containers.AttachContainerAsync( ContainerId, new ContainerAttachParameters() { Stderr = true, Stdin = true, Stdout = true, Stream = true }, capturedCt ); // Make stream accessible from the outside Stream = stream; const int bufferSize = 1024; var buffer = ArrayPool.Shared.Rent(bufferSize); while (!capturedCt.IsCancellationRequested) { try { var readResult = await stream.ReadOutputAsync(buffer, 0, bufferSize, capturedCt); if (readResult.Count > 0) { var decodedBuffer = Encoding.UTF8.GetString(buffer, 0, readResult.Count); await CacheLock.WaitAsync(capturedCt); try { if (Cache.Count > 300) Cache.RemoveRange(0, 50); Cache.Add(decodedBuffer); } finally { CacheLock.Release(); } if (OnOutput != null) await OnOutput.Invoke(decodedBuffer); } if (readResult.EOF) break; } catch (OperationCanceledException) { // Ignored } catch (Exception e) { Logger.LogError(e, "An unhandled error occured while processing container stream"); } } ArrayPool.Shared.Return(buffer); } catch (OperationCanceledException) { // Ignored } catch (Exception e) { Logger.LogError(e, "An unhandled error occured while handling container attaching"); } } Logger.LogTrace("Attach loop exited"); }); } public async Task WriteInputAsync(string value) { if (Stream == null) throw new AggregateException("Stream is not available. Container might not be attached"); var buffer = Encoding.UTF8.GetBytes(value); await Stream.WriteAsync(buffer, 0, buffer.Length, Cts.Token); } public async Task ClearCacheAsync() { await CacheLock.WaitAsync(Cts.Token); try { Cache.Clear(); } finally { CacheLock.Release(); } } public async Task GetCacheAsync() { await CacheLock.WaitAsync(); try { return Cache.ToArray(); } finally { CacheLock.Release(); } } public async ValueTask DisposeAsync() { await Cts.CancelAsync(); Stream?.Dispose(); CacheLock.Dispose(); } }