< Summary

Information
Class: IceRpc.Transports.Slic.Internal.SlicDuplexConnectionWriter
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicDuplexConnectionWriter.cs
Tag: 701_22528036593
Line coverage
98%
Covered lines: 61
Uncovered lines: 1
Coverable lines: 62
Total lines: 113
Line coverage: 98.3%
Branch coverage
100%
Covered branches: 2
Total branches: 2
Branch coverage: 100%
Method coverage
88%
Covered methods: 8
Total methods: 9
Method coverage: 88.8%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_WriterTask()100%11100%
.ctor(...)100%11100%
Advance(...)100%11100%
DisposeAsync()100%22100%
PerformDisposeAsync()100%11100%
GetMemory(...)100%210%
GetSpan(...)100%11100%
Flush()100%11100%
Shutdown()100%11100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicDuplexConnectionWriter.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using System.Buffers;
 4using System.Diagnostics;
 5using System.IO.Pipelines;
 6
 7namespace IceRpc.Transports.Slic.Internal;
 8
 9/// <summary>A helper class to write data to a duplex connection. Its methods shouldn't be called concurrently. The data
 10/// written to this writer is copied and buffered with an internal pipe. The data from the pipe is written on the duplex
 11/// connection with a background task.</summary>
 12internal class SlicDuplexConnectionWriter : IBufferWriter<byte>, IAsyncDisposable
 13{
 154714    internal Task WriterTask { get; private init; }
 15
 16    private readonly IDuplexConnection _connection;
 70317    private readonly CancellationTokenSource _disposeCts = new();
 18    private Task? _disposeTask;
 19    private readonly Pipe _pipe;
 20
 9019221    public void Advance(int bytes) => _pipe.Writer.Advance(bytes);
 22
 23    /// <inheritdoc/>
 24    public ValueTask DisposeAsync()
 70125    {
 70126        _disposeTask ??= PerformDisposeAsync();
 70127        return new(_disposeTask);
 28
 29        async Task PerformDisposeAsync()
 70130        {
 70131            _disposeCts.Cancel();
 32
 70133            await WriterTask.ConfigureAwait(false);
 34
 70135            _pipe.Reader.Complete();
 70136            _pipe.Writer.Complete();
 37
 70138            _disposeCts.Dispose();
 70139        }
 70140    }
 41
 42    /// <inheritdoc/>
 043    public Memory<byte> GetMemory(int sizeHint = 0) => _pipe.Writer.GetMemory(sizeHint);
 44
 45    /// <inheritdoc/>
 9019246    public Span<byte> GetSpan(int sizeHint = 0) => _pipe.Writer.GetSpan(sizeHint);
 47
 48    /// <summary>Constructs a duplex connection writer.</summary>
 49    /// <param name="connection">The duplex connection to write to.</param>
 50    /// <param name="pool">The memory pool to use.</param>
 51    /// <param name="minimumSegmentSize">The minimum segment size for buffers allocated from <paramref
 52    /// name="pool"/>.</param>
 70353    internal SlicDuplexConnectionWriter(IDuplexConnection connection, MemoryPool<byte> pool, int minimumSegmentSize)
 70354    {
 70355        _connection = connection;
 56
 57        // We set pauseWriterThreshold to 0 because Slic implements flow-control at the stream level. So there's no need
 58        // to limit the amount of data buffered by the writer pipe. The amount of data buffered is limited to
 59        // (MaxBidirectionalStreams + MaxUnidirectionalStreams) * PeerPauseWriterThreshold bytes.
 70360        _pipe = new Pipe(new PipeOptions(
 70361            pool: pool,
 70362            minimumSegmentSize: minimumSegmentSize,
 70363            pauseWriterThreshold: 0,
 70364            useSynchronizationContext: false));
 65
 70366        WriterTask = Task.Run(
 70367            async () =>
 70368            {
 70369                try
 70370                {
 522071                    while (true)
 522072                    {
 522073                        ReadResult readResult = await _pipe.Reader.ReadAsync(_disposeCts.Token).ConfigureAwait(false);
 70374
 466975                        if (!readResult.Buffer.IsEmpty)
 457276                        {
 457277                            await _connection.WriteAsync(readResult.Buffer, _disposeCts.Token).ConfigureAwait(false);
 456378                            _pipe.Reader.AdvanceTo(readResult.Buffer.End);
 456379                        }
 70380
 466081                        if (readResult.IsCompleted)
 14382                        {
 14383                            await _connection.ShutdownWriteAsync(_disposeCts.Token).ConfigureAwait(false);
 14184                            break;
 70385                        }
 451786                    }
 14187                    _pipe.Reader.Complete();
 14188                }
 55889                catch (OperationCanceledException)
 55890                {
 70391                    // DisposeAsync was called.
 55892                }
 293                catch (Exception exception)
 294                {
 295                    _pipe.Reader.Complete(exception);
 296                }
 140497            });
 70398    }
 99
 100    internal void Flush()
 12934101    {
 12934102        ValueTask<FlushResult> flushResult = _pipe.Writer.FlushAsync(CancellationToken.None);
 103
 104        // PauseWriterThreshold is 0 so FlushAsync should always complete synchronously.
 12934105        Debug.Assert(flushResult.IsCompletedSuccessfully);
 12934106    }
 107
 108    /// <summary>Requests the shut down of the duplex connection writes after the buffered data is written on the
 109    /// duplex connection.</summary>
 110    internal void Shutdown() =>
 111        // Completing the pipe writer makes the background write task complete successfully.
 143112        _pipe.Writer.Complete();
 113}