< Summary

Information
Class: IceRpc.Transports.Slic.Internal.SlicPipeReader
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicPipeReader.cs
Tag: 275_13775359185
Line coverage
81%
Covered lines: 116
Uncovered lines: 27
Coverable lines: 143
Total lines: 287
Line coverage: 81.1%
Branch coverage
76%
Covered branches: 32
Total branches: 42
Branch coverage: 76.1%
Method coverage
100%
Covered methods: 11
Total methods: 11
Method coverage: 100%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
AdvanceTo(...)100%11100%
AdvanceTo(...)100%22100%
CancelPendingRead()100%11100%
Complete(...)100%22100%
ReadAsync()100%66100%
TryRead(...)62.5%11.54861.9%
.ctor(...)100%11100%
CompleteReads(...)100%44100%
ReceivedDataFrameAsync()78.57%18.081472.5%
GetReadResult(...)25%6450%
ThrowIfCompleted()50%2.26260%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicPipeReader.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Internal;
 4using IceRpc.Transports.Internal;
 5using System.Diagnostics;
 6using System.IO.Pipelines;
 7
 8namespace IceRpc.Transports.Slic.Internal;
 9
 10// The SlicPipeReader doesn't override ReadAtLeastAsyncCore or CopyToAsync methods because:
 11// - we can't forward the calls to the internal pipe reader since reading relies on the AdvanceTo implementation to send
 12//   the StreamWindowUpdate frame once the data is examined,
 13// - the default implementation can't be much optimized.
 14internal class SlicPipeReader : PipeReader
 15{
 16    private int _examined;
 17    private volatile Exception? _exception;
 18    private long _lastExaminedOffset;
 19    private readonly Pipe _pipe;
 20    private ReadResult _readResult;
 21    // FlagEnumExtensions operations are used to update the state. These operations are atomic and don't require mutex
 22    // locking.
 23    private int _state;
 24    private readonly SlicStream _stream;
 25    private int _windowSize;
 26
 628627    public override void AdvanceTo(SequencePosition consumed) => AdvanceTo(consumed, consumed);
 28
 29    public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
 1204330    {
 1204331        ThrowIfCompleted();
 32
 1204333        long startOffset = _readResult.Buffer.GetOffset(_readResult.Buffer.Start);
 1204334        long consumedOffset = _readResult.Buffer.GetOffset(consumed) - startOffset;
 1204335        long examinedOffset = _readResult.Buffer.GetOffset(examined) - startOffset;
 36
 37        // Add the additional examined bytes to the examined bytes total.
 1204338        _examined += (int)(examinedOffset - _lastExaminedOffset);
 1204339        _lastExaminedOffset = examinedOffset - consumedOffset;
 40
 41        // If the number of examined bytes is superior to the window update threshold, notifies the stream of the window
 42        // update. This will trigger the sending of a window update frame and allow the sender to send additional data.
 1204343        if (_examined >= _stream.WindowUpdateThreshold)
 198744        {
 198745            Interlocked.Add(ref _windowSize, _examined);
 198746            _stream.WindowUpdate(_examined);
 198747            _examined = 0;
 198748        }
 49
 1204350        _pipe.Reader.AdvanceTo(consumed, examined);
 1204351    }
 52
 453    public override void CancelPendingRead() => _pipe.Reader.CancelPendingRead();
 54
 55    public override void Complete(Exception? exception = null)
 527956    {
 527957        if (_state.TrySetFlag(State.Completed))
 523758        {
 59            // Forcefully close the stream reads if reads were not already gracefully closed by ReadAsync or TryRead.
 523760            _stream.CloseReads(graceful: false);
 61
 523762            CompleteReads(exception: null);
 63
 523764            _pipe.Reader.Complete();
 523765        }
 527966    }
 67
 68    public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
 1263869    {
 1263870        ThrowIfCompleted();
 71
 1263872        if (_exception is not null)
 2573        {
 2574            _stream.ThrowIfConnectionClosed();
 1775        }
 76
 1263077        ReadResult result = await _pipe.Reader.ReadAsync(cancellationToken).ConfigureAwait(false);
 1211878        if (result.IsCanceled)
 479        {
 480            return GetReadResult(result);
 81        }
 82
 83        // Cache the read result for the implementation of AdvanceTo that needs to figure out how much data got examined
 84        // and consumed.
 1211485        _readResult = result;
 86
 87        // All the data from the peer is considered read at this point. It's time to close reads on the stream. This
 88        // will write the StreamReadsClosed frame to the peer and allow it to release the stream semaphore.
 1211489        if (result.IsCompleted)
 244890        {
 244891            _stream.CloseReads(graceful: true);
 244892        }
 93
 1211494        return result;
 1211895    }
 96
 97    public override bool TryRead(out ReadResult result)
 7198    {
 7199        ThrowIfCompleted();
 100
 71101        if (_exception is not null)
 0102        {
 0103            _stream.ThrowIfConnectionClosed();
 0104        }
 105
 71106        if (_pipe.Reader.TryRead(out result))
 71107        {
 71108            if (result.IsCanceled)
 0109            {
 0110                result = GetReadResult(result);
 0111                return true;
 112            }
 113
 114            // Cache the read result for the implementation of AdvanceTo that needs to figure out how much data got
 115            // examined and consumed.
 71116            _readResult = result;
 117
 118            // All the data from the peer is considered read at this point. It's time to close reads on the stream. This
 119            // will write the StreamReadsClosed frame to the peer and allow it to release the stream semaphore.
 71120            if (result.IsCompleted)
 71121            {
 71122                _stream.CloseReads(graceful: true);
 71123            }
 71124            return true;
 125        }
 126        else
 0127        {
 0128            return false;
 129        }
 71130    }
 131
 5297132    internal SlicPipeReader(SlicStream stream, SlicConnection connection)
 5297133    {
 5297134        _stream = stream;
 5297135        _windowSize = connection.InitialStreamWindowSize;
 136
 137        // We keep the default readerScheduler (ThreadPool) because the _pipe.Writer.FlushAsync executes in the
 138        // "read loop task" and we don't want this task to continue into application code. The writerScheduler
 139        // doesn't matter since _pipe.Writer.FlushAsync never blocks.
 5297140        _pipe = new(new PipeOptions(
 5297141            pool: connection.Pool,
 5297142            pauseWriterThreshold: 0,
 5297143            minimumSegmentSize: connection.MinSegmentSize,
 5297144            useSynchronizationContext: false));
 5297145    }
 146
 147    /// <summary>Completes reads.</summary>
 148    /// <param name="exception">The exception that will be raised by <see cref="ReadAsync" /> or <see cref="TryRead" />
 149    /// operation.</param>
 150    internal void CompleteReads(Exception? exception)
 5930151    {
 5930152        Interlocked.CompareExchange(ref _exception, exception, null);
 153
 5930154        if (_state.TrySetFlag(State.PipeWriterCompleted))
 5291155        {
 5291156            if (_state.HasFlag(State.PipeWriterInUse))
 152157            {
 152158                _pipe.Reader.CancelPendingRead();
 152159            }
 160            else
 5139161            {
 5139162                _pipe.Writer.Complete(exception);
 5139163            }
 5291164        }
 5930165    }
 166
 167    /// <summary>Notifies the reader of the reception of a <see cref="FrameType.Stream" /> or <see
 168    /// cref="FrameType.StreamLast" /> frame. The stream data is consumed from the connection and buffered by this
 169    /// reader on its internal pipe.</summary>
 170    /// <returns><see langword="true" /> if the data was consumed; otherwise, <see langword="false"/> if the reader was
 171    /// completed by the application.</returns>
 172    internal async ValueTask<bool> ReceivedDataFrameAsync(
 173        int dataSize,
 174        bool endStream,
 175        CancellationToken cancellationToken)
 18915176    {
 18915177        if (dataSize == 0 && !endStream)
 0178        {
 0179            throw new IceRpcException(
 0180                IceRpcError.IceRpcError,
 0181                "An empty Slic stream frame is not allowed unless endStream is true.");
 182        }
 183
 18915184        if (!_state.TrySetFlag(State.PipeWriterInUse))
 0185        {
 0186            throw new InvalidOperationException(
 0187                $"The {nameof(ReceivedDataFrameAsync)} operation is not thread safe.");
 188        }
 189
 190        try
 18915191        {
 18915192            if (_state.HasFlag(State.PipeWriterCompleted))
 1193            {
 1194                return false; // No bytes consumed because the application completed the stream input.
 195            }
 196
 18914197            int newWindowSize = Interlocked.Add(ref _windowSize, -dataSize);
 18914198            if (newWindowSize < 0)
 0199            {
 0200                throw new IceRpcException(
 0201                    IceRpcError.IceRpcError,
 0202                    "Received more data than flow control permits.");
 203            }
 204
 205            // Fill the pipe writer with dataSize bytes.
 18914206            await _stream.FillBufferWriterAsync(
 18914207                _pipe.Writer,
 18914208                dataSize,
 18914209                cancellationToken).ConfigureAwait(false);
 210
 18913211            if (endStream)
 2375212            {
 2375213                _pipe.Writer.Complete();
 2375214            }
 215            else
 16538216            {
 16538217                _ = await _pipe.Writer.FlushAsync(CancellationToken.None).ConfigureAwait(false);
 16538218            }
 219
 18913220            return true;
 221        }
 222        finally
 18915223        {
 18915224            if (_state.HasFlag(State.PipeWriterCompleted))
 153225            {
 226                // If the pipe writer has been completed while we were reading the data from the stream, we make sure to
 227                // complete the writer now since Complete or CompleteWriter didn't do it.
 153228                _pipe.Writer.Complete(_exception);
 153229            }
 18915230            _state.ClearFlag(State.PipeWriterInUse);
 18915231        }
 18914232    }
 233
 234    private ReadResult GetReadResult(ReadResult readResult)
 4235    {
 236        // This method is called by ReadAsync or TryRead when the read operation on _pipe.Reader returns a canceled read
 237        // result (IsCanceled=true). The _pipe.Reader ReadAsync/TryRead operations can return a canceled read result for
 238        // two different reasons:
 239        // - the application called CancelPendingRead
 240        // - the connection is closed while data is written on _pipe.Writer
 4241        Debug.Assert(readResult.IsCanceled);
 242
 4243        if (_state.HasFlag(State.PipeWriterCompleted))
 0244        {
 245            // The connection was closed while the pipe writer was in use. Either throw or return a non-canceled result
 246            // depending on the completion exception.
 0247            if (_exception is null)
 0248            {
 0249                return new ReadResult(readResult.Buffer, isCanceled: false, isCompleted: true);
 250            }
 251            else
 0252            {
 0253                throw ExceptionUtil.Throw(_exception);
 254            }
 255        }
 256        else
 4257        {
 258            // The application called CancelPendingRead, return the read result as-is.
 4259            return readResult;
 260        }
 4261    }
 262
 263    private void ThrowIfCompleted()
 24752264    {
 24752265        if (_state.HasFlag(State.Completed))
 0266        {
 267            // If the reader is completed, the caller is bogus, it shouldn't call read operations after completing the
 268            // pipe reader.
 0269            throw new InvalidOperationException("Reading is not allowed once the reader is completed.");
 270        }
 24752271    }
 272
 273    /// <summary>The state enumeration is used to ensure the reader is not used after it's completed and to ensure that
 274    /// the internal pipe writer isn't completed concurrently when it's being used by <see
 275    /// cref="ReceivedDataFrameAsync" />.</summary>
 276    private enum State : int
 277    {
 278        /// <summary><see cref="Complete" /> was called on this Slic pipe reader.</summary>
 279        Completed = 1,
 280
 281        /// <summary>Data is being written to the internal pipe writer.</summary>
 282        PipeWriterInUse = 2,
 283
 284        /// <summary>The internal pipe writer was completed by <see cref="CompleteReads" />.</summary>
 285        PipeWriterCompleted = 4,
 286    }
 287}