< Summary

Information
Class: IceRpc.Transports.Slic.Internal.SlicPipeReader
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicPipeReader.cs
Tag: 1321_24790053727
Line coverage
81%
Covered lines: 116
Uncovered lines: 27
Coverable lines: 143
Total lines: 287
Line coverage: 81.1%
Branch coverage
76%
Covered branches: 32
Total branches: 42
Branch coverage: 76.1%
Method coverage
100%
Covered methods: 11
Fully covered methods: 7
Total methods: 11
Method coverage: 100%
Full method coverage: 63.6%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
AdvanceTo(...)100%11100%
AdvanceTo(...)100%22100%
CancelPendingRead()100%11100%
Complete(...)100%22100%
ReadAsync()100%66100%
TryRead(...)62.5%12861.9%
.ctor(...)100%11100%
CompleteReads(...)100%44100%
ReceivedDataFrameAsync()78.57%181472.5%
GetReadResult(...)25%6450%
ThrowIfCompleted()50%2260%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicPipeReader.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Internal;
 4using IceRpc.Transports.Internal;
 5using System.Diagnostics;
 6using System.IO.Pipelines;
 7
 8namespace IceRpc.Transports.Slic.Internal;
 9
 10// The SlicPipeReader doesn't override ReadAtLeastAsyncCore or CopyToAsync methods because:
 11// - we can't forward the calls to the internal pipe reader since reading relies on the AdvanceTo implementation to send
 12//   the StreamWindowUpdate frame once the data is examined,
 13// - the default implementation can't be much optimized.
 14internal class SlicPipeReader : PipeReader
 15{
 16    private int _examined;
 17    private volatile Exception? _exception;
 18    private long _lastExaminedOffset;
 19    private readonly Pipe _pipe;
 20    private ReadResult _readResult;
 21    // FlagEnumExtensions operations are used to update the state. These operations are atomic and don't require mutex
 22    // locking.
 23    private int _state;
 24    private readonly SlicStream _stream;
 25    private int _windowSize;
 26
 337327    public override void AdvanceTo(SequencePosition consumed) => AdvanceTo(consumed, consumed);
 28
 29    public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
 699430    {
 699431        ThrowIfCompleted();
 32
 699433        long startOffset = _readResult.Buffer.GetOffset(_readResult.Buffer.Start);
 699434        long consumedOffset = _readResult.Buffer.GetOffset(consumed) - startOffset;
 699435        long examinedOffset = _readResult.Buffer.GetOffset(examined) - startOffset;
 36
 37        // Add the additional examined bytes to the examined bytes total.
 699438        _examined += (int)(examinedOffset - _lastExaminedOffset);
 699439        _lastExaminedOffset = examinedOffset - consumedOffset;
 40
 41        // If the number of examined bytes is superior to the window update threshold, notifies the stream of the window
 42        // update. This will trigger the sending of a window update frame and allow the sender to send additional data.
 699443        if (_examined >= _stream.WindowUpdateThreshold)
 116544        {
 116545            Interlocked.Add(ref _windowSize, _examined);
 116546            _stream.WindowUpdate(_examined);
 116547            _examined = 0;
 116548        }
 49
 699450        _pipe.Reader.AdvanceTo(consumed, examined);
 699451    }
 52
 253    public override void CancelPendingRead() => _pipe.Reader.CancelPendingRead();
 54
 55    public override void Complete(Exception? exception = null)
 272256    {
 272257        if (_state.TrySetFlag(State.Completed))
 270058        {
 59            // Forcefully close the stream reads if reads were not already gracefully closed by ReadAsync or TryRead.
 270060            _stream.CloseReads(graceful: false);
 61
 270062            CompleteReads(exception: null);
 63
 270064            _pipe.Reader.Complete();
 270065        }
 272266    }
 67
 68    public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
 727469    {
 727470        ThrowIfCompleted();
 71
 727472        if (_exception is not null)
 1173        {
 1174            _stream.ThrowIfConnectionClosed();
 875        }
 76
 727177        ReadResult result = await _pipe.Reader.ReadAsync(cancellationToken).ConfigureAwait(false);
 701078        if (result.IsCanceled)
 279        {
 280            return GetReadResult(result);
 81        }
 82
 83        // Cache the read result for the implementation of AdvanceTo that needs to figure out how much data got examined
 84        // and consumed.
 700885        _readResult = result;
 86
 87        // All the data from the peer is considered read at this point. It's time to close reads on the stream. This
 88        // will write the StreamReadsClosed frame to the peer and allow it to release the stream semaphore.
 700889        if (result.IsCompleted)
 129290        {
 129291            _stream.CloseReads(graceful: true);
 129292        }
 93
 700894        return result;
 701095    }
 96
 97    public override bool TryRead(out ReadResult result)
 7698    {
 7699        ThrowIfCompleted();
 100
 76101        if (_exception is not null)
 0102        {
 0103            _stream.ThrowIfConnectionClosed();
 0104        }
 105
 76106        if (_pipe.Reader.TryRead(out result))
 76107        {
 76108            if (result.IsCanceled)
 0109            {
 0110                result = GetReadResult(result);
 0111                return true;
 112            }
 113
 114            // Cache the read result for the implementation of AdvanceTo that needs to figure out how much data got
 115            // examined and consumed.
 76116            _readResult = result;
 117
 118            // All the data from the peer is considered read at this point. It's time to close reads on the stream. This
 119            // will write the StreamReadsClosed frame to the peer and allow it to release the stream semaphore.
 76120            if (result.IsCompleted)
 76121            {
 76122                _stream.CloseReads(graceful: true);
 76123            }
 76124            return true;
 125        }
 126        else
 0127        {
 0128            return false;
 129        }
 76130    }
 131
 2731132    internal SlicPipeReader(SlicStream stream, SlicConnection connection)
 2731133    {
 2731134        _stream = stream;
 2731135        _windowSize = connection.InitialStreamWindowSize;
 136
 137        // We keep the default readerScheduler (ThreadPool) because the _pipe.Writer.FlushAsync executes in the
 138        // "read loop task" and we don't want this task to continue into application code. The writerScheduler
 139        // doesn't matter since _pipe.Writer.FlushAsync never blocks.
 2731140        _pipe = new(new PipeOptions(
 2731141            pool: connection.Pool,
 2731142            pauseWriterThreshold: 0,
 2731143            minimumSegmentSize: connection.MinSegmentSize,
 2731144            useSynchronizationContext: false));
 2731145    }
 146
 147    /// <summary>Completes reads.</summary>
 148    /// <param name="exception">The exception that will be raised by <see cref="ReadAsync" /> or <see cref="TryRead" />
 149    /// operation.</param>
 150    internal void CompleteReads(Exception? exception)
 3042151    {
 3042152        Interlocked.CompareExchange(ref _exception, exception, null);
 153
 3042154        if (_state.TrySetFlag(State.PipeWriterCompleted))
 2728155        {
 2728156            if (_state.HasFlag(State.PipeWriterInUse))
 173157            {
 173158                _pipe.Reader.CancelPendingRead();
 173159            }
 160            else
 2555161            {
 2555162                _pipe.Writer.Complete(exception);
 2555163            }
 2728164        }
 3042165    }
 166
 167    /// <summary>Notifies the reader of the reception of a <see cref="FrameType.Stream" /> or <see
 168    /// cref="FrameType.StreamLast" /> frame. The stream data is consumed from the connection and buffered by this
 169    /// reader on its internal pipe.</summary>
 170    /// <returns><see langword="true" /> if the data was consumed; otherwise, <see langword="false"/> if the reader was
 171    /// completed by the application.</returns>
 172    internal async ValueTask<bool> ReceivedDataFrameAsync(
 173        int dataSize,
 174        bool endStream,
 175        CancellationToken cancellationToken)
 9530176    {
 9530177        if (dataSize == 0 && !endStream)
 0178        {
 0179            throw new IceRpcException(
 0180                IceRpcError.IceRpcError,
 0181                "An empty Slic stream frame is not allowed unless endStream is true.");
 182        }
 183
 9530184        if (!_state.TrySetFlag(State.PipeWriterInUse))
 0185        {
 0186            throw new InvalidOperationException(
 0187                $"The {nameof(ReceivedDataFrameAsync)} operation is not thread safe.");
 188        }
 189
 190        try
 9530191        {
 9530192            if (_state.HasFlag(State.PipeWriterCompleted))
 1193            {
 1194                return false; // No bytes consumed because the application completed the stream input.
 195            }
 196
 9529197            int newWindowSize = Interlocked.Add(ref _windowSize, -dataSize);
 9529198            if (newWindowSize < 0)
 0199            {
 0200                throw new IceRpcException(
 0201                    IceRpcError.IceRpcError,
 0202                    "Received more data than flow control permits.");
 203            }
 204
 205            // Fill the pipe writer with dataSize bytes.
 9529206            await _stream.FillBufferWriterAsync(
 9529207                _pipe.Writer,
 9529208                dataSize,
 9529209                cancellationToken).ConfigureAwait(false);
 210
 9529211            if (endStream)
 1265212            {
 1265213                _pipe.Writer.Complete();
 1265214            }
 215            else
 8264216            {
 8264217                _ = await _pipe.Writer.FlushAsync(CancellationToken.None).ConfigureAwait(false);
 8264218            }
 219
 9529220            return true;
 221        }
 222        finally
 9530223        {
 9530224            if (_state.HasFlag(State.PipeWriterCompleted))
 174225            {
 226                // If the pipe writer has been completed while we were reading the data from the stream, we make sure to
 227                // complete the writer now since Complete or CompleteWriter didn't do it.
 174228                _pipe.Writer.Complete(_exception);
 174229            }
 9530230            _state.ClearFlag(State.PipeWriterInUse);
 9530231        }
 9530232    }
 233
 234    private ReadResult GetReadResult(ReadResult readResult)
 2235    {
 236        // This method is called by ReadAsync or TryRead when the read operation on _pipe.Reader returns a canceled read
 237        // result (IsCanceled=true). The _pipe.Reader ReadAsync/TryRead operations can return a canceled read result for
 238        // two different reasons:
 239        // - the application called CancelPendingRead
 240        // - the connection is closed while data is written on _pipe.Writer
 2241        Debug.Assert(readResult.IsCanceled);
 242
 2243        if (_state.HasFlag(State.PipeWriterCompleted))
 0244        {
 245            // The connection was closed while the pipe writer was in use. Either throw or return a non-canceled result
 246            // depending on the completion exception.
 0247            if (_exception is null)
 0248            {
 0249                return new ReadResult(readResult.Buffer, isCanceled: false, isCompleted: true);
 250            }
 251            else
 0252            {
 0253                throw ExceptionUtil.Throw(_exception);
 254            }
 255        }
 256        else
 2257        {
 258            // The application called CancelPendingRead, return the read result as-is.
 2259            return readResult;
 260        }
 2261    }
 262
 263    private void ThrowIfCompleted()
 14344264    {
 14344265        if (_state.HasFlag(State.Completed))
 0266        {
 267            // If the reader is completed, the caller is bogus, it shouldn't call read operations after completing the
 268            // pipe reader.
 0269            throw new InvalidOperationException("Reading is not allowed once the reader is completed.");
 270        }
 14344271    }
 272
 273    /// <summary>The state enumeration is used to ensure the reader is not used after it's completed and to ensure that
 274    /// the internal pipe writer isn't completed concurrently when it's being used by <see
 275    /// cref="ReceivedDataFrameAsync" />.</summary>
 276    private enum State : int
 277    {
 278        /// <summary><see cref="Complete" /> was called on this Slic pipe reader.</summary>
 279        Completed = 1,
 280
 281        /// <summary>Data is being written to the internal pipe writer.</summary>
 282        PipeWriterInUse = 2,
 283
 284        /// <summary>The internal pipe writer was completed by <see cref="CompleteReads" />.</summary>
 285        PipeWriterCompleted = 4,
 286    }
 287}