< Summary

Information
Class: IceRpc.Transports.Slic.Internal.SlicPipeWriter
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicPipeWriter.cs
Tag: 276_17717543480
Line coverage
92%
Covered lines: 106
Uncovered lines: 9
Coverable lines: 115
Total lines: 232
Line coverage: 92.1%
Branch coverage
82%
Covered branches: 28
Total branches: 34
Branch coverage: 82.3%
Method coverage
93%
Covered methods: 14
Total methods: 15
Method coverage: 93.3%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_CanGetUnflushedBytes()100%11100%
get_UnflushedBytes()100%11100%
.ctor(...)100%11100%
Advance(...)50%2.15266.66%
CancelPendingFlush()100%210%
Complete(...)100%66100%
FlushAsync(...)100%11100%
GetMemory(...)100%11100%
GetSpan(...)100%11100%
WriteAsync(...)100%11100%
WriteAsync()80%20.042095.45%
AcquireSendCreditAsync()100%11100%
CompleteWrites(...)100%11100%
ConsumedSendCredit(...)100%22100%
ReceivedWindowUpdateFrame(...)75%4.3473.33%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicPipeWriter.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Internal;
 4using System.Buffers;
 5using System.Diagnostics;
 6using System.IO.Pipelines;
 7
 8namespace IceRpc.Transports.Slic.Internal;
 9
 10// Type owns disposable field(s) '_completeWritesCts' and '_sendCreditSemaphore' but is not disposable
 11#pragma warning disable CA1001
 12internal class SlicPipeWriter : ReadOnlySequencePipeWriter
 13#pragma warning restore CA1001
 14{
 215    public override bool CanGetUnflushedBytes => true;
 16
 709417    public override long UnflushedBytes => _pipe.Writer.UnflushedBytes;
 18
 19    // We can avoid disposing _completeWritesCts because it was not created using CreateLinkedTokenSource, and it
 20    // doesn't use a timer. It is not easy to dispose it because CompleteWrites can be called by another thread after
 21    // Complete has been called.
 531622    private readonly CancellationTokenSource _completeWritesCts = new();
 23    private Exception? _exception;
 24    private bool _isCompleted;
 531625    private volatile int _peerWindowSize = SlicTransportOptions.MaxWindowSize;
 26    private readonly Pipe _pipe;
 27    // The semaphore is used when flow control is enabled to wait for additional send credit to be available.
 531628    private readonly SemaphoreSlim _sendCreditSemaphore = new(1, 1);
 29    private readonly SlicStream _stream;
 30
 31    public override void Advance(int bytes)
 1895632    {
 1895633        if (_isCompleted)
 034        {
 035            throw new InvalidOperationException("Writing is not allowed once the writer is completed.");
 36        }
 1895637        _pipe.Writer.Advance(bytes);
 1895638    }
 39
 40    // SlicPipeWriter does not support this method: the IceRPC core does not need it. And when the application code
 41    // installs a payload writer interceptor, this interceptor should never call it on "next".
 042    public override void CancelPendingFlush() => throw new NotSupportedException();
 43
 44    public override void Complete(Exception? exception = null)
 602345    {
 602346        if (!_isCompleted)
 526247        {
 526248            _isCompleted = true;
 49
 526250            if (exception is null && _pipe.Writer.UnflushedBytes > 0)
 251            {
 252                throw new InvalidOperationException(
 253                    $"Completing a {nameof(SlicPipeWriter)} without an exception is not allowed when this pipe writer ha
 54            }
 55
 56            // If the exception is set, forcefully close the stream writes if writes were not already gracefully closed
 57            // by WriteAsync called with endStream=true. Otherwise, if exception is null, writes are gracefully closed.
 526058            _stream.CloseWrites(graceful: exception is null);
 59
 526060            _pipe.Writer.Complete();
 526061            _pipe.Reader.Complete();
 62
 63            // Don't dispose the semaphore. It's not needed and we don't want to have to catch ObjectDisposedException
 64            // from AdjustPeerWindowSize if a StreamWindowUpdate is received after the application completed the stream
 65            // output. An alternative would be to add a lock but it's a bit overkill given than disposing the semaphore
 66            // is only useful when using SemaphoreSlim.AvailableWaitHandle.
 67            // _sendCreditSemaphore.Dispose();
 526068        }
 602169    }
 70
 71    public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken) =>
 72        // WriteAsync will flush the internal buffer
 79273        WriteAsync(ReadOnlySequence<byte>.Empty, endStream: false, cancellationToken);
 74
 475    public override Memory<byte> GetMemory(int sizeHint) => _pipe.Writer.GetMemory(sizeHint);
 76
 1895677    public override Span<byte> GetSpan(int sizeHint) => _pipe.Writer.GetSpan(sizeHint);
 78
 79    public override ValueTask<FlushResult> WriteAsync(
 80        ReadOnlyMemory<byte> source,
 81        CancellationToken cancellationToken) =>
 1127882        WriteAsync(new ReadOnlySequence<byte>(source), endStream: false, cancellationToken);
 83
 84    public override async ValueTask<FlushResult> WriteAsync(
 85        ReadOnlySequence<byte> source,
 86        bool endStream,
 87        CancellationToken cancellationToken)
 1561488    {
 1561489        if (_isCompleted)
 090        {
 091            throw new InvalidOperationException("Writing is not allowed once the writer is completed.");
 92        }
 93
 94        // Flush the pipe before the check for the close connection. This makes sure that the check for unflushed data
 95        // on successful compete succeeds. See the Complete implementation above.
 1561496        if (_pipe.Writer.UnflushedBytes > 0)
 429997        {
 429998            await _pipe.Writer.FlushAsync(CancellationToken.None).ConfigureAwait(false);
 429999        }
 100
 15614101        _stream.ThrowIfConnectionClosed();
 102
 103        // Abort the stream if the invocation is canceled.
 15604104        using CancellationTokenRegistration cancelTokenRegistration = cancellationToken.UnsafeRegister(
 8105            cts => ((CancellationTokenSource)cts!).Cancel(),
 15604106            _completeWritesCts);
 107
 108        ReadOnlySequence<byte> source1;
 109        ReadOnlySequence<byte> source2;
 15604110        if (_pipe.Reader.TryRead(out ReadResult readResult))
 4299111        {
 4299112            Debug.Assert(!readResult.IsCanceled && !readResult.IsCompleted && readResult.Buffer.Length > 0);
 4299113            source1 = readResult.Buffer;
 4299114            source2 = source;
 4299115        }
 116        else
 11305117        {
 11305118            source1 = source;
 11305119            source2 = ReadOnlySequence<byte>.Empty;
 11305120        }
 121
 15604122        if (source1.IsEmpty && source2.IsEmpty && !endStream)
 9123        {
 124            // WriteAsync is called with an empty buffer, typically by a call to FlushAsync. Some payload writers such
 125            // as the deflate compressor might do this.
 9126            return new FlushResult(isCanceled: false, isCompleted: false);
 127        }
 128
 129        try
 15595130        {
 15595131            return await _stream.WriteStreamFrameAsync(
 15595132                source1,
 15595133                source2,
 15595134                endStream,
 15595135                _completeWritesCts.Token).ConfigureAwait(false);
 136        }
 2027137        catch (OperationCanceledException)
 2027138        {
 2027139            cancellationToken.ThrowIfCancellationRequested();
 2019140            return _exception is null ?
 2019141                new FlushResult(isCanceled: false, isCompleted: true) :
 2019142                throw ExceptionUtil.Throw(_exception);
 143        }
 144        finally
 15595145        {
 15595146            if (readResult.Buffer.Length > 0)
 4299147            {
 4299148                _pipe.Reader.AdvanceTo(readResult.Buffer.End);
 149
 150                // Make sure there's no more data to consume from the pipe.
 4299151                Debug.Assert(!_pipe.Reader.TryRead(out ReadResult _));
 4299152            }
 15595153        }
 15596154    }
 155
 5316156    internal SlicPipeWriter(SlicStream stream, SlicConnection connection)
 5316157    {
 5316158        _stream = stream;
 5316159        _peerWindowSize = connection.PeerInitialStreamWindowSize;
 160
 161        // Create a pipe that never pauses on flush or write. The SlicePipeWriter will pause the flush or write if
 162        // the Slic flow control doesn't permit sending more data.
 163        // The readerScheduler doesn't matter (we don't call _pipe.Reader.ReadAsync) and the writerScheduler doesn't
 164        // matter (_pipe.Writer.FlushAsync never blocks).
 5316165        _pipe = new(new PipeOptions(
 5316166            pool: connection.Pool,
 5316167            minimumSegmentSize: connection.MinSegmentSize,
 5316168            pauseWriterThreshold: 0,
 5316169            useSynchronizationContext: false));
 5316170    }
 171
 172    /// <summary>Acquires send credit.</summary>
 173    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 174    /// <returns>The available send credit.</returns>
 175    /// <remarks>The send credit matches the size of the peer's flow-control window.</remarks>
 176    internal async ValueTask<int> AcquireSendCreditAsync(CancellationToken cancellationToken)
 20176177    {
 178        // Acquire the semaphore to ensure flow control allows sending additional data. It's important to acquire the
 179        // semaphore before checking the peer window size. The semaphore acquisition will block if we can't send
 180        // additional data (_peerWindowSize <= 0).
 20176181        await _sendCreditSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
 18149182        return _peerWindowSize;
 18149183    }
 184
 185    /// <summary>Complete writes.</summary>
 186    /// <param name="exception">The exception that will be raised by <see cref="PipeWriter.WriteAsync" /> or <see
 187    /// cref="FlushAsync" />.</param>
 188    internal void CompleteWrites(Exception? exception)
 3290189    {
 3290190        Interlocked.CompareExchange(ref _exception, exception, null);
 3290191        _completeWritesCts.Cancel();
 3290192    }
 193
 194    /// <summary>Notifies the writer of the amount of send credit consumed by the sending of a stream frame.</summary>
 195    /// <param name="size">The size of the stream frame.</param>
 196    internal void ConsumedSendCredit(int size)
 18149197    {
 18149198        Debug.Assert(_sendCreditSemaphore.CurrentCount == 0); // Can only be called with the semaphore acquired.
 199
 200        // Release the semaphore if the peer's window size is still superior to 0
 18149201        int newPeerWindowSize = Interlocked.Add(ref _peerWindowSize, -size);
 18149202        if (newPeerWindowSize > 0)
 14909203        {
 14909204            _sendCreditSemaphore.Release();
 14909205        }
 18149206    }
 207
 208    /// <summary>Notifies the writer of the reception of a <see cref="FrameType.StreamWindowUpdate" /> frame.</summary>
 209    /// <param name="size">The window size increment.</param>
 210    internal void ReceivedWindowUpdateFrame(int size)
 2064211    {
 2064212        Debug.Assert(size > 0);
 213
 2064214        int newPeerWindowSize = Interlocked.Add(ref _peerWindowSize, size);
 2064215        if (newPeerWindowSize > SlicTransportOptions.MaxWindowSize)
 0216        {
 0217            throw new IceRpcException(
 0218                IceRpcError.IceRpcError,
 0219                $"The window update is trying to increase the window size to a value larger than allowed.");
 220        }
 221
 2064222        int previousPeerWindowSize = newPeerWindowSize - size;
 223
 224        // A zero peer window size indicates that the last write consumed all the send credit and as a result didn't
 225        // release the semaphore. We can now release the semaphore to allow another write to send data.
 2064226        if (previousPeerWindowSize == 0)
 1222227        {
 1222228            Debug.Assert(_sendCreditSemaphore.CurrentCount == 0);
 1222229            _sendCreditSemaphore.Release();
 1222230        }
 2064231    }
 232}