< Summary

Information
Class: IceRpc.Transports.Internal.DuplexConnectionReader
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Internal/DuplexConnectionReader.cs
Tag: 592_20856082467
Line coverage
82%
Covered lines: 91
Uncovered lines: 19
Coverable lines: 110
Total lines: 208
Line coverage: 82.7%
Branch coverage
76%
Covered branches: 32
Total branches: 42
Branch coverage: 76.1%
Method coverage
100%
Covered methods: 10
Total methods: 10
Method coverage: 100%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
Dispose()100%11100%
.ctor(...)100%11100%
AdvanceTo(...)100%11100%
AdvanceTo(...)100%11100%
FillBufferWriterAsync(...)83.33%1212100%
ReadFromConnectionAsync()87.5%10.37866.66%
ReadAsync(...)100%11100%
ReadAtLeastAsync(...)100%11100%
TryRead(...)66.66%66100%
ReadAsyncCore()68.75%22.211671.05%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Internal/DuplexConnectionReader.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Internal;
 4using System.Buffers;
 5using System.Diagnostics;
 6using System.IO.Pipelines;
 7
 8namespace IceRpc.Transports.Internal;
 9
 10/// <summary>A helper class to efficiently read data from a duplex connection. It provides a PipeReader-like API but is
 11/// not a PipeReader. Like a PipeReader, its methods shouldn't be called concurrently.</summary>
 12internal class DuplexConnectionReader : IDisposable
 13{
 14    private readonly IDuplexConnection _connection;
 15    private readonly Pipe _pipe;
 16
 17    public void Dispose()
 91518    {
 91519        _pipe.Writer.Complete();
 91520        _pipe.Reader.Complete();
 91521    }
 22
 23    /// <summary>Constructs a duplex connection reader.</summary>
 24    /// <param name="connection">The duplex connection to reader from.</param>
 25    /// <param name="pool">The memory pool to use.</param>
 26    /// <param name="minimumSegmentSize">The minimum segment size for buffers allocated from <paramref name="pool"/>.
 27    /// </param>
 91628    internal DuplexConnectionReader(IDuplexConnection connection, MemoryPool<byte> pool, int minimumSegmentSize)
 91629    {
 91630        _connection = connection;
 31
 32        // The readerScheduler doesn't matter (we don't call _pipe.Reader.ReadAsync) and the writerScheduler doesn't
 33        // matter (_pipe.Writer.FlushAsync never blocks).
 91634        _pipe = new Pipe(new PipeOptions(
 91635            pool: pool,
 91636            minimumSegmentSize: minimumSegmentSize,
 91637            pauseWriterThreshold: 0,
 91638            useSynchronizationContext: false));
 91639    }
 40
 1772541    internal void AdvanceTo(SequencePosition consumed) => _pipe.Reader.AdvanceTo(consumed);
 42
 43    internal void AdvanceTo(SequencePosition consumed, SequencePosition examined) =>
 8344        _pipe.Reader.AdvanceTo(consumed, examined);
 45
 46    /// <summary>Writes <paramref name="byteCount" /> bytes read from this pipe reader or its underlying connection
 47    /// into <paramref name="bufferWriter" />.</summary>
 48    internal ValueTask FillBufferWriterAsync(
 49        IBufferWriter<byte> bufferWriter,
 50        int byteCount,
 51        CancellationToken cancellationToken)
 1236352    {
 1236353        if (byteCount == 0)
 52054        {
 52055            return default;
 56        }
 57
 58        // If there's still data on the pipe reader, copy the data from the pipe reader synchronously.
 1184359        if (_pipe.Reader.TryRead(out ReadResult readResult))
 1184360        {
 1184361            Debug.Assert(!readResult.IsCompleted && !readResult.IsCanceled && !readResult.Buffer.IsEmpty);
 62
 1184363            ReadOnlySequence<byte> buffer = readResult.Buffer;
 1184364            if (buffer.Length > byteCount)
 327965            {
 327966                buffer = buffer.Slice(0, byteCount);
 327967            }
 68
 1184369            bufferWriter.Write(buffer);
 1184370            _pipe.Reader.AdvanceTo(buffer.End);
 71
 1184372            byteCount -= (int)buffer.Length;
 73
 1184374            if (byteCount == 0)
 580675            {
 580676                return default;
 77            }
 603778        }
 79
 603780        return ReadFromConnectionAsync(byteCount);
 81
 82        // Read the remaining bytes directly from the connection into the buffer writer.
 83        async ValueTask ReadFromConnectionAsync(int byteCount)
 603784        {
 85            try
 603786            {
 87                do
 5339888                {
 5339889                    Memory<byte> buffer = bufferWriter.GetMemory();
 5339890                    if (buffer.Length > byteCount)
 110591                    {
 110592                        buffer = buffer[0..byteCount];
 110593                    }
 94
 5339895                    int read = await _connection.ReadAsync(buffer, cancellationToken).ConfigureAwait(false);
 5339696                    bufferWriter.Advance(read);
 5339697                    byteCount -= read;
 98
 5339699                    if (byteCount > 0 && read == 0)
 0100                    {
 101                        // The peer gracefully shut down the connection but returned less data than expected, it's
 102                        // considered as an error.
 0103                        throw new InvalidDataException("Received less data than expected.");
 104                    }
 53396105                }
 53396106                while (byteCount > 0);
 6035107            }
 0108            catch (ObjectDisposedException exception)
 0109            {
 0110                throw new IceRpcException(
 0111                    IceRpcError.OperationAborted,
 0112                    "The read operation was aborted by the disposal of the duplex connection.",
 0113                    exception);
 114            }
 6035115        }
 12363116    }
 117
 118    /// <summary>Reads and returns bytes from the underlying transport connection. The returned buffer can be empty if
 119    /// the peer shutdown its side of the connection.</summary>
 120    internal ValueTask<ReadOnlySequence<byte>> ReadAsync(CancellationToken cancellationToken = default) =>
 8779121        ReadAsyncCore(minimumSize: 1, canReturnEmptyBuffer: true, cancellationToken);
 122
 123    /// <summary>Reads and returns bytes from the underlying transport connection. The returned buffer has always
 124    /// at least minimumSize bytes.</summary>
 125    internal ValueTask<ReadOnlySequence<byte>> ReadAtLeastAsync(int minimumSize, CancellationToken cancellationToken = d
 4790126        ReadAsyncCore(minimumSize: minimumSize, canReturnEmptyBuffer: false, cancellationToken);
 127
 128    internal bool TryRead(out ReadOnlySequence<byte> buffer)
 13805129    {
 13805130        if (_pipe.Reader.TryRead(out ReadResult readResult))
 5026131        {
 5026132            Debug.Assert(!readResult.IsCompleted && !readResult.IsCanceled && !readResult.Buffer.IsEmpty);
 5026133            buffer = readResult.Buffer;
 5026134            return true;
 135        }
 136        else
 8779137        {
 8779138            buffer = default;
 8779139            return false;
 140        }
 13805141    }
 142
 143    /// <summary>Reads and returns bytes from the underlying transport connection. The returned buffer has always at
 144    /// least minimumSize bytes or if canReturnEmptyBuffer is true, the returned buffer can be empty if the peer
 145    /// shutdown the connection.</summary>
 146    private async ValueTask<ReadOnlySequence<byte>> ReadAsyncCore(
 147        int minimumSize,
 148        bool canReturnEmptyBuffer,
 149        CancellationToken cancellationToken = default)
 13569150    {
 13569151        Debug.Assert(minimumSize > 0);
 152
 153        // Read buffered data first.
 13569154        if (_pipe.Reader.TryRead(out ReadResult readResult))
 2288155        {
 2288156            Debug.Assert(!readResult.IsCompleted && !readResult.IsCanceled && !readResult.Buffer.IsEmpty);
 2288157            if (readResult.Buffer.Length >= minimumSize)
 2288158            {
 2288159                return readResult.Buffer;
 160            }
 0161            _pipe.Reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End);
 0162            minimumSize -= (int)readResult.Buffer.Length;
 0163        }
 164
 165        try
 11281166        {
 167            do
 11281168            {
 169                // Fill the pipe with data read from the connection.
 11281170                Memory<byte> buffer = _pipe.Writer.GetMemory();
 11281171                int read = await _connection.ReadAsync(buffer, cancellationToken).ConfigureAwait(false);
 172
 10641173                _pipe.Writer.Advance(read);
 10641174                minimumSize -= read;
 175
 176                // The peer shutdown its side of the connection, return an empty buffer if allowed.
 10641177                if (read == 0)
 137178                {
 137179                    if (canReturnEmptyBuffer)
 137180                    {
 137181                        break;
 182                    }
 183                    else
 0184                    {
 185                        // The connection was aborted or the peer gracefully shut down the connection but returned less
 186                        // data than expected.
 0187                        throw new IceRpcException(IceRpcError.ConnectionAborted);
 188                    }
 189                }
 10504190            }
 10504191            while (minimumSize > 0);
 10641192        }
 0193        catch (ObjectDisposedException exception)
 0194        {
 0195            throw new IceRpcException(
 0196                IceRpcError.OperationAborted,
 0197                "The read operation was aborted by the disposal of the duplex connection.",
 0198                exception);
 199        }
 200
 10641201        _ = await _pipe.Writer.FlushAsync(cancellationToken).ConfigureAwait(false);
 202
 10638203        _pipe.Reader.TryRead(out readResult);
 10638204        Debug.Assert(!readResult.IsCompleted && !readResult.IsCanceled);
 205
 10638206        return readResult.Buffer;
 12926207    }
 208}