< Summary

Information
Class: IceRpc.Transports.Slic.Internal.SlicPipeWriter
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicPipeWriter.cs
Tag: 701_22528036593
Line coverage
92%
Covered lines: 106
Uncovered lines: 9
Coverable lines: 115
Total lines: 232
Line coverage: 92.1%
Branch coverage
82%
Covered branches: 28
Total branches: 34
Branch coverage: 82.3%
Method coverage
93%
Covered methods: 14
Total methods: 15
Method coverage: 93.3%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_CanGetUnflushedBytes()100%11100%
get_UnflushedBytes()100%11100%
.ctor(...)100%11100%
Advance(...)50%2.15266.66%
CancelPendingFlush()100%210%
Complete(...)100%66100%
FlushAsync(...)100%11100%
GetMemory(...)100%11100%
GetSpan(...)100%11100%
WriteAsync(...)100%11100%
WriteAsync()80%20.042095.45%
AcquireSendCreditAsync()100%11100%
CompleteWrites(...)100%11100%
ConsumedSendCredit(...)100%22100%
ReceivedWindowUpdateFrame(...)75%4.3473.33%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicPipeWriter.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Internal;
 4using System.Buffers;
 5using System.Diagnostics;
 6using System.IO.Pipelines;
 7
 8namespace IceRpc.Transports.Slic.Internal;
 9
 10// Type owns disposable field(s) '_completeWritesCts' and '_sendCreditSemaphore' but is not disposable
 11#pragma warning disable CA1001
 12internal class SlicPipeWriter : ReadOnlySequencePipeWriter
 13#pragma warning restore CA1001
 14{
 115    public override bool CanGetUnflushedBytes => true;
 16
 364917    public override long UnflushedBytes => _pipe.Writer.UnflushedBytes;
 18
 19    // We can avoid disposing _completeWritesCts because it was not created using CreateLinkedTokenSource, and it
 20    // doesn't use a timer. It is not easy to dispose it because CompleteWrites can be called by another thread after
 21    // Complete has been called.
 272722    private readonly CancellationTokenSource _completeWritesCts = new();
 23    private Exception? _exception;
 24    private bool _isCompleted;
 272725    private volatile int _peerWindowSize = SlicTransportOptions.MaxWindowSize;
 26    private readonly Pipe _pipe;
 27    // The semaphore is used when flow control is enabled to wait for additional send credit to be available.
 272728    private readonly SemaphoreSlim _sendCreditSemaphore = new(1, 1);
 29    private readonly SlicStream _stream;
 30
 31    public override void Advance(int bytes)
 982332    {
 982333        if (_isCompleted)
 034        {
 035            throw new InvalidOperationException("Writing is not allowed once the writer is completed.");
 36        }
 982337        _pipe.Writer.Advance(bytes);
 982338    }
 39
 40    // SlicPipeWriter does not support this method: the IceRPC core does not need it. And when the application code
 41    // installs a payload writer interceptor, this interceptor should never call it on "next".
 042    public override void CancelPendingFlush() => throw new NotSupportedException();
 43
 44    public override void Complete(Exception? exception = null)
 308745    {
 308746        if (!_isCompleted)
 270047        {
 270048            _isCompleted = true;
 49
 270050            if (exception is null && _pipe.Writer.UnflushedBytes > 0)
 151            {
 152                throw new InvalidOperationException(
 153                    $"Completing a {nameof(SlicPipeWriter)} without an exception is not allowed when this pipe writer ha
 54            }
 55
 56            // If the exception is set, forcefully close the stream writes if writes were not already gracefully closed
 57            // by WriteAsync called with endStream=true. Otherwise, if exception is null, writes are gracefully closed.
 269958            _stream.CloseWrites(graceful: exception is null);
 59
 269960            _pipe.Writer.Complete();
 269961            _pipe.Reader.Complete();
 62
 63            // Don't dispose the semaphore. It's not needed and we don't want to have to catch ObjectDisposedException
 64            // from AdjustPeerWindowSize if a StreamWindowUpdate is received after the application completed the stream
 65            // output. An alternative would be to add a lock but it's a bit overkill given than disposing the semaphore
 66            // is only useful when using SemaphoreSlim.AvailableWaitHandle.
 67            // _sendCreditSemaphore.Dispose();
 269968        }
 308669    }
 70
 71    public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken) =>
 72        // WriteAsync will flush the internal buffer
 42473        WriteAsync(ReadOnlySequence<byte>.Empty, endStream: false, cancellationToken);
 74
 275    public override Memory<byte> GetMemory(int sizeHint) => _pipe.Writer.GetMemory(sizeHint);
 76
 982377    public override Span<byte> GetSpan(int sizeHint) => _pipe.Writer.GetSpan(sizeHint);
 78
 79    public override ValueTask<FlushResult> WriteAsync(
 80        ReadOnlyMemory<byte> source,
 81        CancellationToken cancellationToken) =>
 564282        WriteAsync(new ReadOnlySequence<byte>(source), endStream: false, cancellationToken);
 83
 84    public override async ValueTask<FlushResult> WriteAsync(
 85        ReadOnlySequence<byte> source,
 86        bool endStream,
 87        CancellationToken cancellationToken)
 788488    {
 788489        if (_isCompleted)
 090        {
 091            throw new InvalidOperationException("Writing is not allowed once the writer is completed.");
 92        }
 93
 94        // Flush the pipe before the check for the close connection. This makes sure that the check for unflushed data
 95        // on successful compete succeeds. See the Complete implementation above.
 788496        if (_pipe.Writer.UnflushedBytes > 0)
 222397        {
 222398            await _pipe.Writer.FlushAsync(CancellationToken.None).ConfigureAwait(false);
 222399        }
 100
 7884101        _stream.ThrowIfConnectionClosed();
 102
 103        // Abort the stream if the invocation is canceled.
 7879104        using CancellationTokenRegistration cancelTokenRegistration = cancellationToken.UnsafeRegister(
 4105            cts => ((CancellationTokenSource)cts!).Cancel(),
 7879106            _completeWritesCts);
 107
 108        ReadOnlySequence<byte> source1;
 109        ReadOnlySequence<byte> source2;
 7879110        if (_pipe.Reader.TryRead(out ReadResult readResult))
 2223111        {
 2223112            Debug.Assert(!readResult.IsCanceled && !readResult.IsCompleted && readResult.Buffer.Length > 0);
 2223113            source1 = readResult.Buffer;
 2223114            source2 = source;
 2223115        }
 116        else
 5656117        {
 5656118            source1 = source;
 5656119            source2 = ReadOnlySequence<byte>.Empty;
 5656120        }
 121
 7879122        if (source1.IsEmpty && source2.IsEmpty && !endStream)
 7123        {
 124            // WriteAsync is called with an empty buffer, typically by a call to FlushAsync. Some payload writers such
 125            // as the deflate compressor might do this.
 7126            return new FlushResult(isCanceled: false, isCompleted: false);
 127        }
 128
 129        try
 7872130        {
 7872131            return await _stream.WriteStreamFrameAsync(
 7872132                source1,
 7872133                source2,
 7872134                endStream,
 7872135                _completeWritesCts.Token).ConfigureAwait(false);
 136        }
 1013137        catch (OperationCanceledException)
 1013138        {
 1013139            cancellationToken.ThrowIfCancellationRequested();
 1009140            return _exception is null ?
 1009141                new FlushResult(isCanceled: false, isCompleted: true) :
 1009142                throw ExceptionUtil.Throw(_exception);
 143        }
 144        finally
 7872145        {
 7872146            if (readResult.Buffer.Length > 0)
 2223147            {
 2223148                _pipe.Reader.AdvanceTo(readResult.Buffer.End);
 149
 150                // Make sure there's no more data to consume from the pipe.
 2223151                Debug.Assert(!_pipe.Reader.TryRead(out ReadResult _));
 2223152            }
 7872153        }
 7875154    }
 155
 2727156    internal SlicPipeWriter(SlicStream stream, SlicConnection connection)
 2727157    {
 2727158        _stream = stream;
 2727159        _peerWindowSize = connection.PeerInitialStreamWindowSize;
 160
 161        // Create a pipe that never pauses on flush or write. The SlicePipeWriter will pause the flush or write if
 162        // the Slic flow control doesn't permit sending more data.
 163        // The readerScheduler doesn't matter (we don't call _pipe.Reader.ReadAsync) and the writerScheduler doesn't
 164        // matter (_pipe.Writer.FlushAsync never blocks).
 2727165        _pipe = new(new PipeOptions(
 2727166            pool: connection.Pool,
 2727167            minimumSegmentSize: connection.MinSegmentSize,
 2727168            pauseWriterThreshold: 0,
 2727169            useSynchronizationContext: false));
 2727170    }
 171
 172    /// <summary>Acquires send credit.</summary>
 173    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 174    /// <returns>The available send credit.</returns>
 175    /// <remarks>The send credit matches the size of the peer's flow-control window.</remarks>
 176    internal async ValueTask<int> AcquireSendCreditAsync(CancellationToken cancellationToken)
 10148177    {
 178        // Acquire the semaphore to ensure flow control allows sending additional data. It's important to acquire the
 179        // semaphore before checking the peer window size. The semaphore acquisition will block if we can't send
 180        // additional data (_peerWindowSize <= 0).
 10148181        await _sendCreditSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
 9135182        return _peerWindowSize;
 9135183    }
 184
 185    /// <summary>Complete writes.</summary>
 186    /// <param name="exception">The exception that will be raised by <see cref="PipeWriter.WriteAsync" /> or <see
 187    /// cref="FlushAsync" />.</param>
 188    internal void CompleteWrites(Exception? exception)
 1665189    {
 1665190        Interlocked.CompareExchange(ref _exception, exception, null);
 1665191        _completeWritesCts.Cancel();
 1665192    }
 193
 194    /// <summary>Notifies the writer of the amount of send credit consumed by the sending of a stream frame.</summary>
 195    /// <param name="size">The size of the stream frame.</param>
 196    internal void ConsumedSendCredit(int size)
 9135197    {
 9135198        Debug.Assert(_sendCreditSemaphore.CurrentCount == 0); // Can only be called with the semaphore acquired.
 199
 200        // Release the semaphore if the peer's window size is still superior to 0
 9135201        int newPeerWindowSize = Interlocked.Add(ref _peerWindowSize, -size);
 9135202        if (newPeerWindowSize > 0)
 7626203        {
 7626204            _sendCreditSemaphore.Release();
 7626205        }
 9135206    }
 207
 208    /// <summary>Notifies the writer of the reception of a <see cref="FrameType.StreamWindowUpdate" /> frame.</summary>
 209    /// <param name="size">The window size increment.</param>
 210    internal void ReceivedWindowUpdateFrame(int size)
 1076211    {
 1076212        Debug.Assert(size > 0);
 213
 1076214        int newPeerWindowSize = Interlocked.Add(ref _peerWindowSize, size);
 1076215        if (newPeerWindowSize > SlicTransportOptions.MaxWindowSize)
 0216        {
 0217            throw new IceRpcException(
 0218                IceRpcError.IceRpcError,
 0219                $"The window update is trying to increase the window size to a value larger than allowed.");
 220        }
 221
 1076222        int previousPeerWindowSize = newPeerWindowSize - size;
 223
 224        // A zero peer window size indicates that the last write consumed all the send credit and as a result didn't
 225        // release the semaphore. We can now release the semaphore to allow another write to send data.
 1076226        if (previousPeerWindowSize == 0)
 500227        {
 500228            Debug.Assert(_sendCreditSemaphore.CurrentCount == 0);
 500229            _sendCreditSemaphore.Release();
 500230        }
 1076231    }
 232}