using System.Buffers; using System.Text; using Docker.DotNet; using Docker.DotNet.Models; using MoonlightServers.Daemon.ServerSystem.Abstractions; namespace MoonlightServers.Daemon.ServerSystem.Implementations.Docker; public class DockerConsole : IRuntimeConsole, IInstallConsole, IAsyncDisposable { public event Func? OnOutput; private MultiplexedStream? Stream; private readonly string ContainerId; private readonly DockerClient DockerClient; private readonly ILogger Logger; private readonly List Cache = new(302); private readonly SemaphoreSlim CacheLock = new(1, 1); private readonly CancellationTokenSource Cts = new(); public DockerConsole( string containerId, DockerClient dockerClient, ILogger logger ) { ContainerId = containerId; DockerClient = dockerClient; Logger = logger; } public async Task AttachAsync() { // Fetch initial container logs Logger.LogTrace("Fetching initial container logs"); using var logStream = await DockerClient.Containers.GetContainerLogsAsync( ContainerId, new ContainerLogsParameters() { Follow = false, ShowStderr = true, ShowStdout = true } ); // and process it await ProcessStreamAsync(logStream, Cts.Token); // After that we can actually start streaming the new logs Logger.LogTrace("Attaching to container"); Stream = await DockerClient.Containers.AttachContainerAsync( ContainerId, new ContainerAttachParameters() { Stderr = true, Stdin = true, Stdout = true, Stream = true }, Cts.Token ); Task.Run(async () => { Logger.LogTrace("Entered streaming loop"); while (!Cts.IsCancellationRequested) { try { if (Stream == null) // Triggers when e.g. a connection issue occurs cause the catch clause resets the stream { Logger.LogTrace("Reattaching to container"); Stream = await DockerClient.Containers.AttachContainerAsync( ContainerId, new ContainerAttachParameters() { Stderr = true, Stdin = true, Stdout = true, Stream = true }, Cts.Token ); } await ProcessStreamAsync(Stream, Cts.Token); } catch (OperationCanceledException) { // Ignored } catch (Exception e) { Logger.LogError(e, "An unhandled error occured while processing container stream"); } finally { Stream?.Dispose(); Stream = null; } } Logger.LogTrace("Exited streaming loop"); }); } private async Task ProcessStreamAsync(MultiplexedStream stream, CancellationToken cancellationToken) { const int bufferSize = 1024; var buffer = ArrayPool.Shared.Rent(bufferSize); while (!cancellationToken.IsCancellationRequested) { var readResult = await stream.ReadOutputAsync(buffer, 0, bufferSize, cancellationToken); if (readResult.Count > 0) { var decodedBuffer = Encoding.UTF8.GetString(buffer, 0, readResult.Count); await CacheLock.WaitAsync(cancellationToken); try { if (Cache.Count > 300) Cache.RemoveRange(0, 50); Cache.Add(decodedBuffer); } finally { CacheLock.Release(); } if (OnOutput != null) await OnOutput.Invoke(decodedBuffer); } if (readResult.EOF) break; } ArrayPool.Shared.Return(buffer); } public async Task WriteInputAsync(string value) { if (Stream == null) throw new AggregateException("Stream is not available. Container might not be attached"); var buffer = Encoding.UTF8.GetBytes(value); await Stream.WriteAsync(buffer, 0, buffer.Length, Cts.Token); } public async Task ClearCacheAsync() { await CacheLock.WaitAsync(Cts.Token); try { Cache.Clear(); } finally { CacheLock.Release(); } } public async Task GetCacheAsync() { await CacheLock.WaitAsync(); try { return Cache.ToArray(); } finally { CacheLock.Release(); } } public async ValueTask DisposeAsync() { Logger.LogTrace("Disposing"); await Cts.CancelAsync(); Stream?.Dispose(); CacheLock.Dispose(); } }