< Summary

Information
Class: IceRpc.Transports.Internal.DuplexConnectionReader
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Internal/DuplexConnectionReader.cs
Tag: 275_13775359185
Line coverage
82%
Covered lines: 91
Uncovered lines: 19
Coverable lines: 110
Total lines: 208
Line coverage: 82.7%
Branch coverage
76%
Covered branches: 32
Total branches: 42
Branch coverage: 76.1%
Method coverage
100%
Covered methods: 10
Total methods: 10
Method coverage: 100%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
Dispose()100%11100%
.ctor(...)100%11100%
AdvanceTo(...)100%11100%
AdvanceTo(...)100%11100%
FillBufferWriterAsync(...)83.33%1212100%
ReadFromConnectionAsync()87.5%10.37866.66%
ReadAsync(...)100%11100%
ReadAtLeastAsync(...)100%11100%
TryRead(...)66.66%66100%
ReadAsyncCore()68.75%22.211671.05%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Internal/DuplexConnectionReader.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Internal;
 4using System.Buffers;
 5using System.Diagnostics;
 6using System.IO.Pipelines;
 7
 8namespace IceRpc.Transports.Internal;
 9
 10/// <summary>A helper class to efficiently read data from a duplex connection. It provides a PipeReader-like API but is
 11/// not a PipeReader. Like a PipeReader, its methods shouldn't be called concurrently.</summary>
 12internal class DuplexConnectionReader : IDisposable
 13{
 14    private readonly IDuplexConnection _connection;
 15    private readonly Pipe _pipe;
 16
 17    public void Dispose()
 179318    {
 179319        _pipe.Writer.Complete();
 179320        _pipe.Reader.Complete();
 179321    }
 22
 23    /// <summary>Constructs a duplex connection reader.</summary>
 24    /// <param name="connection">The duplex connection to reader from.</param>
 25    /// <param name="pool">The memory pool to use.</param>
 26    /// <param name="minimumSegmentSize">The minimum segment size for buffers allocated from <paramref name="pool"/>.
 27    /// </param>
 179528    internal DuplexConnectionReader(IDuplexConnection connection, MemoryPool<byte> pool, int minimumSegmentSize)
 179529    {
 179530        _connection = connection;
 31
 32        // The readerScheduler doesn't matter (we don't call _pipe.Reader.ReadAsync) and the writerScheduler doesn't
 33        // matter (_pipe.Writer.FlushAsync never blocks).
 179534        _pipe = new Pipe(new PipeOptions(
 179535            pool: pool,
 179536            minimumSegmentSize: minimumSegmentSize,
 179537            pauseWriterThreshold: 0,
 179538            useSynchronizationContext: false));
 179539    }
 40
 3495341    internal void AdvanceTo(SequencePosition consumed) => _pipe.Reader.AdvanceTo(consumed);
 42
 43    internal void AdvanceTo(SequencePosition consumed, SequencePosition examined) =>
 16144        _pipe.Reader.AdvanceTo(consumed, examined);
 45
 46    /// <summary>Writes <paramref name="byteCount" /> bytes read from this pipe reader or its underlying connection
 47    /// into <paramref name="bufferWriter" />.</summary>
 48    internal ValueTask FillBufferWriterAsync(
 49        IBufferWriter<byte> bufferWriter,
 50        int byteCount,
 51        CancellationToken cancellationToken)
 2450452    {
 2450453        if (byteCount == 0)
 101754        {
 101755            return default;
 56        }
 57
 58        // If there's still data on the pipe reader, copy the data from the pipe reader synchronously.
 2348759        if (_pipe.Reader.TryRead(out ReadResult readResult))
 2348760        {
 2348761            Debug.Assert(!readResult.IsCompleted && !readResult.IsCanceled && !readResult.Buffer.IsEmpty);
 62
 2348763            ReadOnlySequence<byte> buffer = readResult.Buffer;
 2348764            if (buffer.Length > byteCount)
 667465            {
 667466                buffer = buffer.Slice(0, byteCount);
 667467            }
 68
 2348769            bufferWriter.Write(buffer);
 2348770            _pipe.Reader.AdvanceTo(buffer.End);
 71
 2348772            byteCount -= (int)buffer.Length;
 73
 2348774            if (byteCount == 0)
 1142575            {
 1142576                return default;
 77            }
 1206278        }
 79
 1206280        return ReadFromConnectionAsync(byteCount);
 81
 82        // Read the remaining bytes directly from the connection into the buffer writer.
 83        async ValueTask ReadFromConnectionAsync(int byteCount)
 1206284        {
 85            try
 1206286            {
 87                do
 11110088                {
 11110089                    Memory<byte> buffer = bufferWriter.GetMemory();
 11110090                    if (buffer.Length > byteCount)
 223491                    {
 223492                        buffer = buffer[0..byteCount];
 223493                    }
 94
 11110095                    int read = await _connection.ReadAsync(buffer, cancellationToken).ConfigureAwait(false);
 11109496                    bufferWriter.Advance(read);
 11109497                    byteCount -= read;
 98
 11109499                    if (byteCount > 0 && read == 0)
 0100                    {
 101                        // The peer gracefully shut down the connection but returned less data than expected, it's
 102                        // considered as an error.
 0103                        throw new InvalidDataException("Received less data than expected.");
 104                    }
 111094105                }
 111094106                while (byteCount > 0);
 12056107            }
 0108            catch (ObjectDisposedException exception)
 0109            {
 0110                throw new IceRpcException(
 0111                    IceRpcError.OperationAborted,
 0112                    "The read operation was aborted by the disposal of the duplex connection.",
 0113                    exception);
 114            }
 12056115        }
 24504116    }
 117
 118    /// <summary>Reads and returns bytes from the underlying transport connection. The returned buffer can be empty if
 119    /// the peer shutdown its side of the connection.</summary>
 120    internal ValueTask<ReadOnlySequence<byte>> ReadAsync(CancellationToken cancellationToken = default) =>
 17067121        ReadAsyncCore(minimumSize: 1, canReturnEmptyBuffer: true, cancellationToken);
 122
 123    /// <summary>Reads and returns bytes from the underlying transport connection. The returned buffer has always
 124    /// at least minimumSize bytes.</summary>
 125    internal ValueTask<ReadOnlySequence<byte>> ReadAtLeastAsync(int minimumSize, CancellationToken cancellationToken = d
 9377126        ReadAsyncCore(minimumSize: minimumSize, canReturnEmptyBuffer: false, cancellationToken);
 127
 128    internal bool TryRead(out ReadOnlySequence<byte> buffer)
 27278129    {
 27278130        if (_pipe.Reader.TryRead(out ReadResult readResult))
 10211131        {
 10211132            Debug.Assert(!readResult.IsCompleted && !readResult.IsCanceled && !readResult.Buffer.IsEmpty);
 10211133            buffer = readResult.Buffer;
 10211134            return true;
 135        }
 136        else
 17067137        {
 17067138            buffer = default;
 17067139            return false;
 140        }
 27278141    }
 142
 143    /// <summary>Reads and returns bytes from the underlying transport connection. The returned buffer has always at
 144    /// least minimumSize bytes or if canReturnEmptyBuffer is true, the returned buffer can be empty if the peer
 145    /// shutdown the connection.</summary>
 146    private async ValueTask<ReadOnlySequence<byte>> ReadAsyncCore(
 147        int minimumSize,
 148        bool canReturnEmptyBuffer,
 149        CancellationToken cancellationToken = default)
 26444150    {
 26444151        Debug.Assert(minimumSize > 0);
 152
 153        // Read buffered data first.
 26444154        if (_pipe.Reader.TryRead(out ReadResult readResult))
 4501155        {
 4501156            Debug.Assert(!readResult.IsCompleted && !readResult.IsCanceled && !readResult.Buffer.IsEmpty);
 4501157            if (readResult.Buffer.Length >= minimumSize)
 4501158            {
 4501159                return readResult.Buffer;
 160            }
 0161            _pipe.Reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End);
 0162            minimumSize -= (int)readResult.Buffer.Length;
 0163        }
 164
 165        try
 21943166        {
 167            do
 21943168            {
 169                // Fill the pipe with data read from the connection.
 21943170                Memory<byte> buffer = _pipe.Writer.GetMemory();
 21943171                int read = await _connection.ReadAsync(buffer, cancellationToken).ConfigureAwait(false);
 172
 20682173                _pipe.Writer.Advance(read);
 20682174                minimumSize -= read;
 175
 176                // The peer shutdown its side of the connection, return an empty buffer if allowed.
 20682177                if (read == 0)
 262178                {
 262179                    if (canReturnEmptyBuffer)
 262180                    {
 262181                        break;
 182                    }
 183                    else
 0184                    {
 185                        // The connection was aborted or the peer gracefully shut down the connection but returned less
 186                        // data than expected.
 0187                        throw new IceRpcException(IceRpcError.ConnectionAborted);
 188                    }
 189                }
 20420190            }
 20420191            while (minimumSize > 0);
 20682192        }
 0193        catch (ObjectDisposedException exception)
 0194        {
 0195            throw new IceRpcException(
 0196                IceRpcError.OperationAborted,
 0197                "The read operation was aborted by the disposal of the duplex connection.",
 0198                exception);
 199        }
 200
 20682201        _ = await _pipe.Writer.FlushAsync(cancellationToken).ConfigureAwait(false);
 202
 20678203        _pipe.Reader.TryRead(out readResult);
 20678204        Debug.Assert(!readResult.IsCompleted && !readResult.IsCanceled);
 205
 20678206        return readResult.Buffer;
 25179207    }
 208}