< Summary

Information
Class: IceRpc.Transports.Internal.DuplexConnectionReader
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Internal/DuplexConnectionReader.cs
Tag: 1321_24790053727
Line coverage
82%
Covered lines: 91
Uncovered lines: 19
Coverable lines: 110
Total lines: 208
Line coverage: 82.7%
Branch coverage
76%
Covered branches: 32
Total branches: 42
Branch coverage: 76.1%
Method coverage
100%
Covered methods: 10
Fully covered methods: 7
Total methods: 10
Method coverage: 100%
Full method coverage: 70%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
Dispose()100%11100%
.ctor(...)100%11100%
AdvanceTo(...)100%11100%
AdvanceTo(...)100%11100%
FillBufferWriterAsync(...)83.33%1212100%
ReadFromConnectionAsync()87.5%10866.66%
ReadAsync(...)100%11100%
ReadAtLeastAsync(...)100%11100%
TryRead(...)66.66%66100%
ReadAsyncCore()68.75%221671.05%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Internal/DuplexConnectionReader.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Internal;
 4using System.Buffers;
 5using System.Diagnostics;
 6using System.IO.Pipelines;
 7
 8namespace IceRpc.Transports.Internal;
 9
 10/// <summary>A helper class to efficiently read data from a duplex connection. It provides a PipeReader-like API but is
 11/// not a PipeReader. Like a PipeReader, its methods shouldn't be called concurrently.</summary>
 12internal class DuplexConnectionReader : IDisposable
 13{
 14    private readonly IDuplexConnection _connection;
 15    private readonly Pipe _pipe;
 16
 17    public void Dispose()
 94418    {
 94419        _pipe.Writer.Complete();
 94420        _pipe.Reader.Complete();
 94421    }
 22
 23    /// <summary>Constructs a duplex connection reader.</summary>
 24    /// <param name="connection">The duplex connection to reader from.</param>
 25    /// <param name="pool">The memory pool to use.</param>
 26    /// <param name="minimumSegmentSize">The minimum segment size for buffers allocated from <paramref name="pool"/>.
 27    /// </param>
 94528    internal DuplexConnectionReader(IDuplexConnection connection, MemoryPool<byte> pool, int minimumSegmentSize)
 94529    {
 94530        _connection = connection;
 31
 32        // The readerScheduler doesn't matter (we don't call _pipe.Reader.ReadAsync) and the writerScheduler doesn't
 33        // matter (_pipe.Writer.FlushAsync never blocks).
 94534        _pipe = new Pipe(new PipeOptions(
 94535            pool: pool,
 94536            minimumSegmentSize: minimumSegmentSize,
 94537            pauseWriterThreshold: 0,
 94538            useSynchronizationContext: false));
 94539    }
 40
 1814841    internal void AdvanceTo(SequencePosition consumed) => _pipe.Reader.AdvanceTo(consumed);
 42
 43    internal void AdvanceTo(SequencePosition consumed, SequencePosition examined) =>
 9644        _pipe.Reader.AdvanceTo(consumed, examined);
 45
 46    /// <summary>Writes <paramref name="byteCount" /> bytes read from this pipe reader or its underlying connection
 47    /// into <paramref name="bufferWriter" />.</summary>
 48    internal ValueTask FillBufferWriterAsync(
 49        IBufferWriter<byte> bufferWriter,
 50        int byteCount,
 51        CancellationToken cancellationToken)
 1238052    {
 1238053        if (byteCount == 0)
 53654        {
 53655            return default;
 56        }
 57
 58        // If there's still data on the pipe reader, copy the data from the pipe reader synchronously.
 1184459        if (_pipe.Reader.TryRead(out ReadResult readResult))
 1184460        {
 1184461            Debug.Assert(!readResult.IsCompleted && !readResult.IsCanceled && !readResult.Buffer.IsEmpty);
 62
 1184463            ReadOnlySequence<byte> buffer = readResult.Buffer;
 1184464            if (buffer.Length > byteCount)
 279065            {
 279066                buffer = buffer.Slice(0, byteCount);
 279067            }
 68
 1184469            bufferWriter.Write(buffer);
 1184470            _pipe.Reader.AdvanceTo(buffer.End);
 71
 1184472            byteCount -= (int)buffer.Length;
 73
 1184474            if (byteCount == 0)
 579475            {
 579476                return default;
 77            }
 605078        }
 79
 605080        return ReadFromConnectionAsync(byteCount);
 81
 82        // Read the remaining bytes directly from the connection into the buffer writer.
 83        async ValueTask ReadFromConnectionAsync(int byteCount)
 605084        {
 85            try
 605086            {
 87                do
 5341588                {
 5341589                    Memory<byte> buffer = bufferWriter.GetMemory();
 5341590                    if (buffer.Length > byteCount)
 109291                    {
 109292                        buffer = buffer[0..byteCount];
 109293                    }
 94
 5341595                    int read = await _connection.ReadAsync(buffer, cancellationToken).ConfigureAwait(false);
 5341496                    bufferWriter.Advance(read);
 5341497                    byteCount -= read;
 98
 5341499                    if (byteCount > 0 && read == 0)
 0100                    {
 101                        // The peer gracefully shut down the connection but returned less data than expected, it's
 102                        // considered as an error.
 0103                        throw new InvalidDataException("Received less data than expected.");
 104                    }
 53414105                }
 53414106                while (byteCount > 0);
 6049107            }
 0108            catch (ObjectDisposedException exception)
 0109            {
 0110                throw new IceRpcException(
 0111                    IceRpcError.OperationAborted,
 0112                    "The read operation was aborted by the disposal of the duplex connection.",
 0113                    exception);
 114            }
 6049115        }
 12380116    }
 117
 118    /// <summary>Reads and returns bytes from the underlying transport connection. The returned buffer can be empty if
 119    /// the peer shutdown its side of the connection.</summary>
 120    internal ValueTask<ReadOnlySequence<byte>> ReadAsync(CancellationToken cancellationToken = default) =>
 9488121        ReadAsyncCore(minimumSize: 1, canReturnEmptyBuffer: true, cancellationToken);
 122
 123    /// <summary>Reads and returns bytes from the underlying transport connection. The returned buffer has always
 124    /// at least minimumSize bytes.</summary>
 125    internal ValueTask<ReadOnlySequence<byte>> ReadAtLeastAsync(int minimumSize, CancellationToken cancellationToken = d
 5019126        ReadAsyncCore(minimumSize: minimumSize, canReturnEmptyBuffer: false, cancellationToken);
 127
 128    internal bool TryRead(out ReadOnlySequence<byte> buffer)
 14043129    {
 14043130        if (_pipe.Reader.TryRead(out ReadResult readResult))
 4555131        {
 4555132            Debug.Assert(!readResult.IsCompleted && !readResult.IsCanceled && !readResult.Buffer.IsEmpty);
 4555133            buffer = readResult.Buffer;
 4555134            return true;
 135        }
 136        else
 9488137        {
 9488138            buffer = default;
 9488139            return false;
 140        }
 14043141    }
 142
 143    /// <summary>Reads and returns bytes from the underlying transport connection. The returned buffer has always at
 144    /// least minimumSize bytes or if canReturnEmptyBuffer is true, the returned buffer can be empty if the peer
 145    /// shutdown the connection.</summary>
 146    private async ValueTask<ReadOnlySequence<byte>> ReadAsyncCore(
 147        int minimumSize,
 148        bool canReturnEmptyBuffer,
 149        CancellationToken cancellationToken = default)
 14507150    {
 14507151        Debug.Assert(minimumSize > 0);
 152
 153        // Read buffered data first.
 14507154        if (_pipe.Reader.TryRead(out ReadResult readResult))
 2227155        {
 2227156            Debug.Assert(!readResult.IsCompleted && !readResult.IsCanceled && !readResult.Buffer.IsEmpty);
 2227157            if (readResult.Buffer.Length >= minimumSize)
 2227158            {
 2227159                return readResult.Buffer;
 160            }
 0161            _pipe.Reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End);
 0162            minimumSize -= (int)readResult.Buffer.Length;
 0163        }
 164
 165        try
 12280166        {
 167            do
 12280168            {
 169                // Fill the pipe with data read from the connection.
 12280170                Memory<byte> buffer = _pipe.Writer.GetMemory();
 12280171                int read = await _connection.ReadAsync(buffer, cancellationToken).ConfigureAwait(false);
 172
 11617173                _pipe.Writer.Advance(read);
 11617174                minimumSize -= read;
 175
 176                // The peer shutdown its side of the connection, return an empty buffer if allowed.
 11617177                if (read == 0)
 142178                {
 142179                    if (canReturnEmptyBuffer)
 142180                    {
 142181                        break;
 182                    }
 183                    else
 0184                    {
 185                        // The connection was aborted or the peer gracefully shut down the connection but returned less
 186                        // data than expected.
 0187                        throw new IceRpcException(IceRpcError.ConnectionAborted);
 188                    }
 189                }
 11475190            }
 11475191            while (minimumSize > 0);
 11617192        }
 0193        catch (ObjectDisposedException exception)
 0194        {
 0195            throw new IceRpcException(
 0196                IceRpcError.OperationAborted,
 0197                "The read operation was aborted by the disposal of the duplex connection.",
 0198                exception);
 199        }
 200
 11617201        _ = await _pipe.Writer.FlushAsync(cancellationToken).ConfigureAwait(false);
 202
 11612203        _pipe.Reader.TryRead(out readResult);
 11612204        Debug.Assert(!readResult.IsCompleted && !readResult.IsCanceled);
 205
 11612206        return readResult.Buffer;
 13839207    }
 208}