< Summary

Information
Class: IceRpc.Transports.Internal.DuplexConnectionReader
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Internal/DuplexConnectionReader.cs
Tag: 276_17717543480
Line coverage
82%
Covered lines: 91
Uncovered lines: 19
Coverable lines: 110
Total lines: 208
Line coverage: 82.7%
Branch coverage
76%
Covered branches: 32
Total branches: 42
Branch coverage: 76.1%
Method coverage
100%
Covered methods: 10
Total methods: 10
Method coverage: 100%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
Dispose()100%11100%
.ctor(...)100%11100%
AdvanceTo(...)100%11100%
AdvanceTo(...)100%11100%
FillBufferWriterAsync(...)83.33%1212100%
ReadFromConnectionAsync()87.5%10.37866.66%
ReadAsync(...)100%11100%
ReadAtLeastAsync(...)100%11100%
TryRead(...)66.66%66100%
ReadAsyncCore()68.75%22.211671.05%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Internal/DuplexConnectionReader.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Internal;
 4using System.Buffers;
 5using System.Diagnostics;
 6using System.IO.Pipelines;
 7
 8namespace IceRpc.Transports.Internal;
 9
 10/// <summary>A helper class to efficiently read data from a duplex connection. It provides a PipeReader-like API but is
 11/// not a PipeReader. Like a PipeReader, its methods shouldn't be called concurrently.</summary>
 12internal class DuplexConnectionReader : IDisposable
 13{
 14    private readonly IDuplexConnection _connection;
 15    private readonly Pipe _pipe;
 16
 17    public void Dispose()
 178918    {
 178919        _pipe.Writer.Complete();
 178920        _pipe.Reader.Complete();
 178921    }
 22
 23    /// <summary>Constructs a duplex connection reader.</summary>
 24    /// <param name="connection">The duplex connection to reader from.</param>
 25    /// <param name="pool">The memory pool to use.</param>
 26    /// <param name="minimumSegmentSize">The minimum segment size for buffers allocated from <paramref name="pool"/>.
 27    /// </param>
 179128    internal DuplexConnectionReader(IDuplexConnection connection, MemoryPool<byte> pool, int minimumSegmentSize)
 179129    {
 179130        _connection = connection;
 31
 32        // The readerScheduler doesn't matter (we don't call _pipe.Reader.ReadAsync) and the writerScheduler doesn't
 33        // matter (_pipe.Writer.FlushAsync never blocks).
 179134        _pipe = new Pipe(new PipeOptions(
 179135            pool: pool,
 179136            minimumSegmentSize: minimumSegmentSize,
 179137            pauseWriterThreshold: 0,
 179138            useSynchronizationContext: false));
 179139    }
 40
 3533441    internal void AdvanceTo(SequencePosition consumed) => _pipe.Reader.AdvanceTo(consumed);
 42
 43    internal void AdvanceTo(SequencePosition consumed, SequencePosition examined) =>
 16144        _pipe.Reader.AdvanceTo(consumed, examined);
 45
 46    /// <summary>Writes <paramref name="byteCount" /> bytes read from this pipe reader or its underlying connection
 47    /// into <paramref name="bufferWriter" />.</summary>
 48    internal ValueTask FillBufferWriterAsync(
 49        IBufferWriter<byte> bufferWriter,
 50        int byteCount,
 51        CancellationToken cancellationToken)
 2444752    {
 2444753        if (byteCount == 0)
 103554        {
 103555            return default;
 56        }
 57
 58        // If there's still data on the pipe reader, copy the data from the pipe reader synchronously.
 2341259        if (_pipe.Reader.TryRead(out ReadResult readResult))
 2341260        {
 2341261            Debug.Assert(!readResult.IsCompleted && !readResult.IsCanceled && !readResult.Buffer.IsEmpty);
 62
 2341263            ReadOnlySequence<byte> buffer = readResult.Buffer;
 2341264            if (buffer.Length > byteCount)
 642465            {
 642466                buffer = buffer.Slice(0, byteCount);
 642467            }
 68
 2341269            bufferWriter.Write(buffer);
 2341270            _pipe.Reader.AdvanceTo(buffer.End);
 71
 2341272            byteCount -= (int)buffer.Length;
 73
 2341274            if (byteCount == 0)
 1137375            {
 1137376                return default;
 77            }
 1203978        }
 79
 1203980        return ReadFromConnectionAsync(byteCount);
 81
 82        // Read the remaining bytes directly from the connection into the buffer writer.
 83        async ValueTask ReadFromConnectionAsync(int byteCount)
 1203984        {
 85            try
 1203986            {
 87                do
 10269088                {
 10269089                    Memory<byte> buffer = bufferWriter.GetMemory();
 10269090                    if (buffer.Length > byteCount)
 216891                    {
 216892                        buffer = buffer[0..byteCount];
 216893                    }
 94
 10269095                    int read = await _connection.ReadAsync(buffer, cancellationToken).ConfigureAwait(false);
 10268796                    bufferWriter.Advance(read);
 10268797                    byteCount -= read;
 98
 10268799                    if (byteCount > 0 && read == 0)
 0100                    {
 101                        // The peer gracefully shut down the connection but returned less data than expected, it's
 102                        // considered as an error.
 0103                        throw new InvalidDataException("Received less data than expected.");
 104                    }
 102687105                }
 102687106                while (byteCount > 0);
 12036107            }
 0108            catch (ObjectDisposedException exception)
 0109            {
 0110                throw new IceRpcException(
 0111                    IceRpcError.OperationAborted,
 0112                    "The read operation was aborted by the disposal of the duplex connection.",
 0113                    exception);
 114            }
 12036115        }
 24447116    }
 117
 118    /// <summary>Reads and returns bytes from the underlying transport connection. The returned buffer can be empty if
 119    /// the peer shutdown its side of the connection.</summary>
 120    internal ValueTask<ReadOnlySequence<byte>> ReadAsync(CancellationToken cancellationToken = default) =>
 17635121        ReadAsyncCore(minimumSize: 1, canReturnEmptyBuffer: true, cancellationToken);
 122
 123    /// <summary>Reads and returns bytes from the underlying transport connection. The returned buffer has always
 124    /// at least minimumSize bytes.</summary>
 125    internal ValueTask<ReadOnlySequence<byte>> ReadAtLeastAsync(int minimumSize, CancellationToken cancellationToken = d
 9556126        ReadAsyncCore(minimumSize: minimumSize, canReturnEmptyBuffer: false, cancellationToken);
 127
 128    internal bool TryRead(out ReadOnlySequence<byte> buffer)
 27481129    {
 27481130        if (_pipe.Reader.TryRead(out ReadResult readResult))
 9846131        {
 9846132            Debug.Assert(!readResult.IsCompleted && !readResult.IsCanceled && !readResult.Buffer.IsEmpty);
 9846133            buffer = readResult.Buffer;
 9846134            return true;
 135        }
 136        else
 17635137        {
 17635138            buffer = default;
 17635139            return false;
 140        }
 27481141    }
 142
 143    /// <summary>Reads and returns bytes from the underlying transport connection. The returned buffer has always at
 144    /// least minimumSize bytes or if canReturnEmptyBuffer is true, the returned buffer can be empty if the peer
 145    /// shutdown the connection.</summary>
 146    private async ValueTask<ReadOnlySequence<byte>> ReadAsyncCore(
 147        int minimumSize,
 148        bool canReturnEmptyBuffer,
 149        CancellationToken cancellationToken = default)
 27191150    {
 27191151        Debug.Assert(minimumSize > 0);
 152
 153        // Read buffered data first.
 27191154        if (_pipe.Reader.TryRead(out ReadResult readResult))
 4712155        {
 4712156            Debug.Assert(!readResult.IsCompleted && !readResult.IsCanceled && !readResult.Buffer.IsEmpty);
 4712157            if (readResult.Buffer.Length >= minimumSize)
 4712158            {
 4712159                return readResult.Buffer;
 160            }
 0161            _pipe.Reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End);
 0162            minimumSize -= (int)readResult.Buffer.Length;
 0163        }
 164
 165        try
 22479166        {
 167            do
 22479168            {
 169                // Fill the pipe with data read from the connection.
 22479170                Memory<byte> buffer = _pipe.Writer.GetMemory();
 22479171                int read = await _connection.ReadAsync(buffer, cancellationToken).ConfigureAwait(false);
 172
 21228173                _pipe.Writer.Advance(read);
 21228174                minimumSize -= read;
 175
 176                // The peer shutdown its side of the connection, return an empty buffer if allowed.
 21228177                if (read == 0)
 268178                {
 268179                    if (canReturnEmptyBuffer)
 268180                    {
 268181                        break;
 182                    }
 183                    else
 0184                    {
 185                        // The connection was aborted or the peer gracefully shut down the connection but returned less
 186                        // data than expected.
 0187                        throw new IceRpcException(IceRpcError.ConnectionAborted);
 188                    }
 189                }
 20960190            }
 20960191            while (minimumSize > 0);
 21228192        }
 0193        catch (ObjectDisposedException exception)
 0194        {
 0195            throw new IceRpcException(
 0196                IceRpcError.OperationAborted,
 0197                "The read operation was aborted by the disposal of the duplex connection.",
 0198                exception);
 199        }
 200
 21228201        _ = await _pipe.Writer.FlushAsync(cancellationToken).ConfigureAwait(false);
 202
 21219203        _pipe.Reader.TryRead(out readResult);
 21219204        Debug.Assert(!readResult.IsCompleted && !readResult.IsCanceled);
 205
 21219206        return readResult.Buffer;
 25931207    }
 208}