< Summary

Information
Class: IceRpc.Transports.Internal.DuplexConnectionReader
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Internal/DuplexConnectionReader.cs
Tag: 1856_27024993493
Line coverage
82%
Covered lines: 91
Uncovered lines: 19
Coverable lines: 110
Total lines: 208
Line coverage: 82.7%
Branch coverage
73%
Covered branches: 31
Total branches: 42
Branch coverage: 73.8%
Method coverage
100%
Covered methods: 10
Fully covered methods: 7
Total methods: 10
Method coverage: 100%
Full method coverage: 70%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
Dispose()100%11100%
.ctor(...)100%11100%
AdvanceTo(...)100%11100%
AdvanceTo(...)100%11100%
FillBufferWriterAsync(...)75%1212100%
ReadFromConnectionAsync()87.5%10866.66%
ReadAsync(...)100%11100%
ReadAtLeastAsync(...)100%11100%
TryRead(...)66.66%66100%
ReadAsyncCore()68.75%221671.05%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Internal/DuplexConnectionReader.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Internal;
 4using System.Buffers;
 5using System.Diagnostics;
 6using System.IO.Pipelines;
 7
 8namespace IceRpc.Transports.Internal;
 9
 10/// <summary>A helper class to efficiently read data from a duplex connection. It provides a PipeReader-like API but is
 11/// not a PipeReader. Like a PipeReader, its methods shouldn't be called concurrently.</summary>
 12internal class DuplexConnectionReader : IDisposable
 13{
 14    private readonly IDuplexConnection _connection;
 15    private readonly Pipe _pipe;
 16
 17    public void Dispose()
 100318    {
 100319        _pipe.Writer.Complete();
 100320        _pipe.Reader.Complete();
 100321    }
 22
 23    /// <summary>Constructs a duplex connection reader.</summary>
 24    /// <param name="connection">The duplex connection to reader from.</param>
 25    /// <param name="pool">The memory pool to use.</param>
 26    /// <param name="minimumSegmentSize">The minimum segment size for buffers allocated from <paramref name="pool"/>.
 27    /// </param>
 100428    internal DuplexConnectionReader(IDuplexConnection connection, MemoryPool<byte> pool, int minimumSegmentSize)
 100429    {
 100430        _connection = connection;
 31
 32        // The readerScheduler doesn't matter (we don't call _pipe.Reader.ReadAsync) and the writerScheduler doesn't
 33        // matter (_pipe.Writer.FlushAsync never blocks).
 100434        _pipe = new Pipe(new PipeOptions(
 100435            pool: pool,
 100436            minimumSegmentSize: minimumSegmentSize,
 100437            pauseWriterThreshold: 0,
 100438            useSynchronizationContext: false));
 100439    }
 40
 1760941    internal void AdvanceTo(SequencePosition consumed) => _pipe.Reader.AdvanceTo(consumed);
 42
 43    internal void AdvanceTo(SequencePosition consumed, SequencePosition examined) =>
 10044        _pipe.Reader.AdvanceTo(consumed, examined);
 45
 46    /// <summary>Writes <paramref name="byteCount" /> bytes read from this pipe reader or its underlying connection
 47    /// into <paramref name="bufferWriter" />.</summary>
 48    internal ValueTask FillBufferWriterAsync(
 49        IBufferWriter<byte> bufferWriter,
 50        int byteCount,
 51        CancellationToken cancellationToken)
 1154152    {
 1154153        if (byteCount == 0)
 56554        {
 56555            return default;
 56        }
 57
 58        // If there's still data on the pipe reader, copy the data from the pipe reader synchronously.
 1097659        if (_pipe.Reader.TryRead(out ReadResult readResult))
 1097660        {
 1097661            Debug.Assert(!readResult.IsCompleted && !readResult.IsCanceled && !readResult.Buffer.IsEmpty);
 62
 1097663            ReadOnlySequence<byte> buffer = readResult.Buffer;
 1097664            if (buffer.Length > byteCount)
 313665            {
 313666                buffer = buffer.Slice(0, byteCount);
 313667            }
 68
 1097669            bufferWriter.Write(buffer);
 1097670            _pipe.Reader.AdvanceTo(buffer.End);
 71
 1097672            byteCount -= (int)buffer.Length;
 73
 1097674            if (byteCount == 0)
 585175            {
 585176                return default;
 77            }
 512578        }
 79
 512580        return ReadFromConnectionAsync(byteCount);
 81
 82        // Read the remaining bytes directly from the connection into the buffer writer.
 83        async ValueTask ReadFromConnectionAsync(int byteCount)
 512584        {
 85            try
 512586            {
 87                do
 3788288                {
 3788289                    Memory<byte> buffer = bufferWriter.GetMemory();
 3788290                    if (buffer.Length > byteCount)
 108691                    {
 108692                        buffer = buffer[0..byteCount];
 108693                    }
 94
 3788295                    int read = await _connection.ReadAsync(buffer, cancellationToken).ConfigureAwait(false);
 3788296                    bufferWriter.Advance(read);
 3788297                    byteCount -= read;
 98
 3788299                    if (byteCount > 0 && read == 0)
 0100                    {
 101                        // The peer gracefully shut down the connection but returned less data than expected, it's
 102                        // considered as an error.
 0103                        throw new InvalidDataException("Received less data than expected.");
 104                    }
 37882105                }
 37882106                while (byteCount > 0);
 5125107            }
 0108            catch (ObjectDisposedException exception)
 0109            {
 0110                throw new IceRpcException(
 0111                    IceRpcError.OperationAborted,
 0112                    "The read operation was aborted by the disposal of the duplex connection.",
 0113                    exception);
 114            }
 5125115        }
 11541116    }
 117
 118    /// <summary>Reads and returns bytes from the underlying transport connection. The returned buffer can be empty if
 119    /// the peer shutdown its side of the connection.</summary>
 120    internal ValueTask<ReadOnlySequence<byte>> ReadAsync(CancellationToken cancellationToken = default) =>
 8519121        ReadAsyncCore(minimumSize: 1, canReturnEmptyBuffer: true, cancellationToken);
 122
 123    /// <summary>Reads and returns bytes from the underlying transport connection. The returned buffer has always
 124    /// at least minimumSize bytes.</summary>
 125    internal ValueTask<ReadOnlySequence<byte>> ReadAtLeastAsync(int minimumSize, CancellationToken cancellationToken = d
 5172126        ReadAsyncCore(minimumSize: minimumSize, canReturnEmptyBuffer: false, cancellationToken);
 127
 128    internal bool TryRead(out ReadOnlySequence<byte> buffer)
 13405129    {
 13405130        if (_pipe.Reader.TryRead(out ReadResult readResult))
 4886131        {
 4886132            Debug.Assert(!readResult.IsCompleted && !readResult.IsCanceled && !readResult.Buffer.IsEmpty);
 4886133            buffer = readResult.Buffer;
 4886134            return true;
 135        }
 136        else
 8519137        {
 8519138            buffer = default;
 8519139            return false;
 140        }
 13405141    }
 142
 143    /// <summary>Reads and returns bytes from the underlying transport connection. The returned buffer has always at
 144    /// least minimumSize bytes or if canReturnEmptyBuffer is true, the returned buffer can be empty if the peer
 145    /// shutdown the connection.</summary>
 146    private async ValueTask<ReadOnlySequence<byte>> ReadAsyncCore(
 147        int minimumSize,
 148        bool canReturnEmptyBuffer,
 149        CancellationToken cancellationToken = default)
 13691150    {
 13691151        Debug.Assert(minimumSize > 0);
 152
 153        // Read buffered data first.
 13691154        if (_pipe.Reader.TryRead(out ReadResult readResult))
 2540155        {
 2540156            Debug.Assert(!readResult.IsCompleted && !readResult.IsCanceled && !readResult.Buffer.IsEmpty);
 2540157            if (readResult.Buffer.Length >= minimumSize)
 2540158            {
 2540159                return readResult.Buffer;
 160            }
 0161            _pipe.Reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End);
 0162            minimumSize -= (int)readResult.Buffer.Length;
 0163        }
 164
 165        try
 11151166        {
 167            do
 11151168            {
 169                // Fill the pipe with data read from the connection.
 11151170                Memory<byte> buffer = _pipe.Writer.GetMemory();
 11151171                int read = await _connection.ReadAsync(buffer, cancellationToken).ConfigureAwait(false);
 172
 10437173                _pipe.Writer.Advance(read);
 10437174                minimumSize -= read;
 175
 176                // The peer shutdown its side of the connection, return an empty buffer if allowed.
 10437177                if (read == 0)
 139178                {
 139179                    if (canReturnEmptyBuffer)
 139180                    {
 139181                        break;
 182                    }
 183                    else
 0184                    {
 185                        // The connection was aborted or the peer gracefully shut down the connection but returned less
 186                        // data than expected.
 0187                        throw new IceRpcException(IceRpcError.ConnectionAborted);
 188                    }
 189                }
 10298190            }
 10298191            while (minimumSize > 0);
 10437192        }
 0193        catch (ObjectDisposedException exception)
 0194        {
 0195            throw new IceRpcException(
 0196                IceRpcError.OperationAborted,
 0197                "The read operation was aborted by the disposal of the duplex connection.",
 0198                exception);
 199        }
 200
 10437201        _ = await _pipe.Writer.FlushAsync(cancellationToken).ConfigureAwait(false);
 202
 10430203        _pipe.Reader.TryRead(out readResult);
 10430204        Debug.Assert(!readResult.IsCompleted && !readResult.IsCanceled);
 205
 10430206        return readResult.Buffer;
 12970207    }
 208}