< Summary

Information
Class: IceRpc.ResettablePipeReaderDecorator
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/ResettablePipeReaderDecorator.cs
Tag: 275_13775359185
Line coverage
72%
Covered lines: 143
Uncovered lines: 54
Coverable lines: 197
Total lines: 386
Line coverage: 72.5%
Branch coverage
75%
Covered branches: 48
Total branches: 64
Branch coverage: 75%
Method coverage
100%
Covered methods: 14
Total methods: 14
Method coverage: 100%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_IsResettable()100%11100%
set_IsResettable(...)66.66%9.01656.25%
.ctor(...)100%11100%
AdvanceTo(...)100%11100%
AdvanceTo(...)80%11.351076.19%
CancelPendingRead()100%11100%
Complete(...)100%66100%
ReadAsync()75%4.08482.6%
Reset()62.5%9.17873.68%
TryRead(...)50%9.94652.17%
ReadAtLeastAsyncCore()50%12.28644.11%
AdvanceDecoratee()87.5%88100%
ProcessReadResult(...)100%88100%
ThrowIfCompleted()50%2.5250%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/ResettablePipeReaderDecorator.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using System.Buffers;
 4using System.Diagnostics;
 5using System.IO.Pipelines;
 6
 7namespace IceRpc;
 8
 9/// <summary>Represents a <see cref="PipeReader" /> decorator that doesn't consume the data from the decoratee to allow
 10/// reading again this data from the beginning after being reset.</summary>
 11/// <remarks><para>The decorator becomes non-resettable if the decoratee's buffered data exceeds the maximum buffer size
 12/// provided to <see cref="ResettablePipeReaderDecorator(PipeReader, int)" /> or if the reading from the decoratee fails
 13/// with an exception other than <see cref="OperationCanceledException"/>.</para>
 14/// <para>Calling <see cref="Complete" /> on the decorator doesn't complete the decoratee to allow reading again the
 15/// data after the decorator is reset. It's therefore important to make the decorator non-resettable by setting <see
 16/// cref="IsResettable" /> to <see langword="false" /> to complete the decoratee.</para></remarks>
 17// The default CopyToAsync implementation is suitable for this reader implementation. It calls ReadAsync/AdvanceTo to
 18// read the data. This ensures that the decorated pipe reader buffered data is not consumed.
 19public sealed class ResettablePipeReaderDecorator : PipeReader
 20{
 21    /// <summary>Gets or sets a value indicating whether this decorator can be reset.</summary>
 22    /// <value><see langword="true"/> if this decorator can be reset; otherwise, <see langword="false"/>. Defaults to
 23    /// <see langword="true"/>.</value>
 24    /// <remarks>This property can only be set to <see langword="false" />. If <see cref="IsResettable"/> is <see
 25    /// langword="true" /> and <see cref="Complete" /> was called, the decoratee is completed.</remarks>
 26    public bool IsResettable
 27    {
 2428        get => _isResettable;
 29
 30        set
 2531        {
 2532            if (value)
 033            {
 034                throw new ArgumentException(
 035                    $"The {nameof(IsResettable)} property cannot be set to true.",
 036                    nameof(value));
 37            }
 38
 2539            if (_isResettable)
 2040            {
 2041                AdvanceDecoratee();
 42
 43                // If Complete was called on this resettable decorator without an intervening Reset, we call Complete
 44                // on the decoratee.
 45
 2046                _isResettable = false;
 2047                if (_isReaderCompleted)
 048                {
 49                    // We complete the decoratee with the saved exception (can be null).
 050                    Complete(_readerCompleteException);
 051                }
 2052            }
 2553        }
 54    }
 55
 56    // The latest consumed given by caller; reset by Reset.
 57    private SequencePosition? _consumed;
 58    private readonly PipeReader _decoratee;
 59    // The latest examined given by caller.
 60    private SequencePosition? _examined;
 61
 62    // The highest examined given to _decoratee; not affected by Reset.
 63    private SequencePosition? _highestExamined;
 64
 65    // True when read returned a canceled read result.
 66    private bool _isCanceled;
 67    // True when the caller complete this reader; reset by Reset.
 68    private bool _isReaderCompleted;
 69    private bool _isReadingInProgress;
 4470    private bool _isResettable = true;
 71    private readonly int _maxBufferSize;
 72    private Exception? _readerCompleteException;
 73
 74    // The latest sequence returned by _decoratee; not affected by Reset.
 75    private ReadOnlySequence<byte> _sequence;
 76
 77    /// <summary>Constructs a resettable pipe reader decorator.</summary>
 78    /// <param name="decoratee">The pipe reader being decorated.</param>
 79    /// <param name="maxBufferSize">The maximum size of buffered data in bytes.</param>
 4480    public ResettablePipeReaderDecorator(PipeReader decoratee, int maxBufferSize)
 4481    {
 4482        _decoratee = decoratee;
 4483        _maxBufferSize = maxBufferSize;
 4484    }
 85
 86    /// <summary>Moves forward the pipeline's read cursor to after the consumed data. No data is consumed while
 87    /// <see cref="IsResettable"/> value is true.</summary>
 88    /// <param name="consumed">Marks the extent of the data that has been successfully processed.</param>
 89    /// <seealso cref="PipeReader.AdvanceTo(SequencePosition)"/>
 3490    public override void AdvanceTo(SequencePosition consumed) => AdvanceTo(consumed, consumed);
 91
 92    /// <summary>Moves forward the pipeline's read cursor to after the consumed data. No data is consumed while
 93    /// <see cref="IsResettable"/> value is true.</summary>
 94    /// <param name="consumed">Marks the extent of the data that has been successfully processed.</param>
 95    /// <param name="examined">Marks the extent of the data that has been read and examined.</param>
 96    /// <seealso cref="PipeReader.AdvanceTo(SequencePosition, SequencePosition)"/>
 97    public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
 4298    {
 99        // If reading returns a canceled read result, _isReadInProgress is set to false since it's not required to call
 100        // AdvanceTo. Calling AdvanceTo after getting a canceled read result is also valid so we don't check if reading
 101        // is in progress in this case.
 42102        if (!_isCanceled && !_isReadingInProgress)
 0103        {
 0104            throw new InvalidOperationException("Cannot call AdvanceTo before reading the PipeReader.");
 105        }
 106
 42107        _isReadingInProgress = false;
 108
 42109        Debug.Assert(_examined is null);
 110
 42111        if (_isResettable)
 36112        {
 36113            ThrowIfCompleted();
 114
 115            // Don't call _decoratee.AdvanceTo just yet. It will be called on the next ReadAsync/TryRead call. This
 116            // way, if Reset is called next, it won't mark the data as examined and the following ReadAsync/TryRead
 117            // call won't block. It will return the buffered data.
 36118            _examined = examined;
 36119            _consumed = consumed;
 36120        }
 121        else
 6122        {
 123            // The examined position given to _decoratee.AdvanceTo must be ever-increasing.
 6124            if (_highestExamined is not null &&
 6125                _sequence.GetOffset(examined) < _sequence.GetOffset(_highestExamined.Value))
 0126            {
 0127                examined = _highestExamined.Value;
 0128            }
 6129            _decoratee.AdvanceTo(consumed, examined);
 6130        }
 42131    }
 132
 133    /// <summary>Cancels the pending <see cref="ReadAsync(CancellationToken)"/> operation without causing it to throw
 134    /// and without completing the <see cref="PipeReader"/>. If there is no pending operation, this cancels the next
 135    /// operation.</summary>
 136    /// <seealso cref="PipeReader.CancelPendingRead"/>
 137    // This method can be called from another thread so we always forward it to the decoratee directly.
 138    // ReadAsync/ReadAtLeastAsync/TryRead will return IsCanceled as appropriate.
 6139    public override void CancelPendingRead() => _decoratee.CancelPendingRead();
 140
 141    /// <summary>Signals to the producer that the consumer is done reading.</summary>
 142    /// <param name="exception">Optional <see cref="Exception "/> indicating a failure that's causing the pipeline to
 143    /// complete.</param>
 144    /// <seealso cref="PipeReader.Complete(Exception?)"/>
 145    /// <remarks>If <see cref="IsResettable"/> value is true, <see cref="Complete" /> is not called on the decoratee to
 146    /// allow reading again the data after a call to <see cref="Reset" />. To complete the decoratee, <see
 147    /// cref="IsResettable" /> must be set to <see langword="false" />.</remarks>
 148    public override void Complete(Exception? exception = default)
 42149    {
 42150        if (_isResettable)
 20151        {
 20152            if (_isReadingInProgress)
 14153            {
 14154                AdvanceTo(_sequence.Start);
 14155            }
 156
 20157            if (!_isReaderCompleted)
 20158            {
 159                // Only save the first call to Complete
 20160                _isReaderCompleted = true;
 20161                _readerCompleteException = exception;
 20162            }
 163            // we naturally don't complete the decoratee, otherwise this decorator would no longer be resettable
 20164        }
 165        else
 22166        {
 22167            _isReadingInProgress = false;
 22168            _decoratee.Complete(exception);
 22169        }
 42170    }
 171
 172    /// <summary>Asynchronously reads a sequence of bytes from the current <see cref="PipeReader"/>.</summary>
 173    /// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
 174    /// <returns>A <see cref="ValueTask{TResult}"/> representing the asynchronous read operation.</returns>
 175    /// <seealso cref="PipeReader.ReadAsync(CancellationToken)"/>
 176    public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
 40177    {
 40178        _isReadingInProgress = !_isReadingInProgress ? true :
 40179            throw new InvalidOperationException("Reading is already in progress.");
 180
 40181        ThrowIfCompleted();
 182
 40183        AdvanceDecoratee();
 184
 185        ReadResult readResult;
 186        try
 40187        {
 40188            readResult = await _decoratee.ReadAsync(cancellationToken).ConfigureAwait(false);
 38189            if (readResult.IsCanceled)
 4190            {
 4191                _isCanceled = true;
 4192                _isReadingInProgress = false;
 4193            }
 38194        }
 2195        catch (OperationCanceledException)
 2196        {
 2197            _isReadingInProgress = false;
 2198            throw;
 199        }
 0200        catch
 0201        {
 0202            _isResettable = false;
 0203            throw;
 204        }
 38205        return ProcessReadResult(readResult);
 38206    }
 207
 208    /// <summary>Resets this pipe reader.</summary>
 209    /// <exception cref="InvalidOperationException">Thrown if <see cref="IsResettable" /> is <see langword="false" /> or
 210    /// if reading is in progress.</exception>
 211    public void Reset()
 26212    {
 26213        if (_isResettable)
 26214        {
 26215            if (_isReadingInProgress)
 0216            {
 0217                throw new InvalidOperationException(
 0218                    "The resettable pipe reader decorator cannot be reset while reading is in progress.");
 219            }
 220
 26221            if (_examined is not null)
 12222            {
 223                // Don't commit the caller's examined data on the decoratee. This ensures that the next ReadAsync call
 224                // returns synchronously with the decoratee's buffered data (instead of blocking).
 12225                _decoratee.AdvanceTo(_sequence.Start, _highestExamined ?? _sequence.Start);
 12226                _examined = null;
 12227            }
 228
 26229            _consumed = null;
 26230            _isReaderCompleted = false;
 26231            _readerCompleteException = null;
 26232        }
 233        else
 0234        {
 0235            throw new InvalidOperationException("Cannot reset non-resettable pipe reader decorator.");
 236        }
 26237    }
 238
 239    /// <summary>Attempts to synchronously read data from the <see cref="PipeReader"/>.</summary>
 240    /// <param name="result">When this method returns <see langword="true"/>, this value is set to a
 241    /// <see cref="ReadResult"/> instance that represents the result of the read call; otherwise, this value is set to
 242    /// <see langword="default"/>.</param>
 243    /// <returns><see langword="true"/> if data was available, or if the call was canceled or the writer was completed;
 244    /// otherwise, <see langword="false"/>.</returns>
 245    /// <seealso cref="PipeReader.TryRead(out ReadResult)"/>.
 246    public override bool TryRead(out ReadResult result)
 4247    {
 4248        _isReadingInProgress = !_isReadingInProgress ? true :
 4249            throw new InvalidOperationException("Reading is already in progress.");
 250
 4251        ThrowIfCompleted();
 252
 4253        AdvanceDecoratee();
 254
 255        try
 4256        {
 4257            if (_decoratee.TryRead(out result))
 4258            {
 4259                if (result.IsCanceled)
 0260                {
 0261                    _isCanceled = true;
 0262                    _isReadingInProgress = false;
 0263                }
 4264                result = ProcessReadResult(result);
 4265                return true;
 266            }
 267            else
 0268            {
 0269                _isReadingInProgress = false;
 0270                return false;
 271            }
 272        }
 0273        catch
 0274        {
 0275            _isResettable = false;
 0276            throw;
 277        }
 4278    }
 279
 280    /// <summary>Asynchronously reads a sequence of bytes from the current PipeReader.</summary>
 281    /// <param name="minimumSize">The minimum length that needs to be buffered in order for the call to return.</param>
 282    /// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
 283    /// <returns>A <see cref="ValueTask{TResult}"/> representing the asynchronous read operation.</returns>
 284    protected override async ValueTask<ReadResult> ReadAtLeastAsyncCore(
 285        int minimumSize,
 286        CancellationToken cancellationToken = default)
 4287    {
 4288        _isReadingInProgress = !_isReadingInProgress ? true :
 4289            throw new InvalidOperationException("Reading is already in progress.");
 290
 4291        ThrowIfCompleted();
 292
 4293        AdvanceDecoratee();
 294
 4295        long size = (_consumed is null ? 0 : _sequence.GetOffset(_consumed.Value)) + minimumSize;
 296        try
 4297        {
 4298            minimumSize = checked((int)size);
 4299        }
 0300        catch (OverflowException exception)
 0301        {
 302            // In theory this shouldn't happen if _maxBufferSize is set to a reasonable value.
 0303            throw new ArgumentException(
 0304                $"{minimumSize} is too large and would cause the buffered data to be larger than int.MaxValue",
 0305                nameof(minimumSize),
 0306                exception);
 307        }
 308
 309        ReadResult readResult;
 310        try
 4311        {
 4312            readResult = await _decoratee.ReadAtLeastAsync(minimumSize, cancellationToken).ConfigureAwait(false);
 4313            if (readResult.IsCanceled)
 0314            {
 0315                _isCanceled = true;
 0316                _isReadingInProgress = false;
 0317            }
 4318        }
 0319        catch (OperationCanceledException)
 0320        {
 0321            _isReadingInProgress = false;
 0322            throw;
 323        }
 0324        catch
 0325        {
 0326            _isReadingInProgress = false;
 0327            _isResettable = false;
 0328            throw;
 329        }
 4330        return ProcessReadResult(readResult);
 4331    }
 332
 333    /// <summary>Advances the decoratee to the examined position saved by the AdvanceTo call on the decorator.</summary>
 334    /// <remarks>Calling AdvanceTo on the decoratee commits the latest examined data and ensures the next read call will
 335    /// read additional data if all the data was examined. If the decorator is reset, this ensures that the next read
 336    /// call will immediately return the buffered data instead of blocking.</remarks>
 337    private void AdvanceDecoratee()
 68338    {
 68339        _isCanceled = false;
 340
 68341        if (_isResettable && _examined is not null)
 8342        {
 343            // The examined position given to _decoratee.AdvanceTo must be ever-increasing.
 8344            if (_highestExamined is null ||
 8345                _sequence.GetOffset(_examined.Value) > _sequence.GetOffset(_highestExamined.Value))
 8346            {
 8347                _highestExamined = _examined;
 8348            }
 349
 8350            _decoratee.AdvanceTo(_sequence.Start, _highestExamined.Value);
 351
 8352            _examined = null;
 8353        }
 68354    }
 355
 356    private ReadResult ProcessReadResult(ReadResult readResult)
 46357    {
 46358        _sequence = readResult.Buffer;
 359
 46360        if (_consumed is SequencePosition consumed)
 6361        {
 362            // Remove bytes marked as consumed
 6363            readResult = new ReadResult(
 6364                readResult.Buffer.Slice(consumed),
 6365                readResult.IsCanceled,
 6366                readResult.IsCompleted);
 6367        }
 368
 369        // We don't retry when the buffered data exceeds the maximum buffer size.
 46370        if (_isResettable && (_sequence.Length > _maxBufferSize))
 2371        {
 2372            _isResettable = false;
 2373        }
 374
 46375        return readResult;
 46376    }
 377
 378    private void ThrowIfCompleted()
 84379    {
 84380        if (_isReaderCompleted)
 0381        {
 0382            _isResettable = false;
 0383            throw new InvalidOperationException("The pipe reader is completed.");
 384        }
 84385    }
 386}