< Summary

Information
Class: IceRpc.Transports.Slic.Internal.SlicPipeWriter
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicPipeWriter.cs
Tag: 592_20856082467
Line coverage
92%
Covered lines: 106
Uncovered lines: 9
Coverable lines: 115
Total lines: 232
Line coverage: 92.1%
Branch coverage
82%
Covered branches: 28
Total branches: 34
Branch coverage: 82.3%
Method coverage
93%
Covered methods: 14
Total methods: 15
Method coverage: 93.3%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_CanGetUnflushedBytes()100%11100%
get_UnflushedBytes()100%11100%
.ctor(...)100%11100%
Advance(...)50%2.15266.66%
CancelPendingFlush()100%210%
Complete(...)100%66100%
FlushAsync(...)100%11100%
GetMemory(...)100%11100%
GetSpan(...)100%11100%
WriteAsync(...)100%11100%
WriteAsync()80%20.042095.45%
AcquireSendCreditAsync()100%11100%
CompleteWrites(...)100%11100%
ConsumedSendCredit(...)100%22100%
ReceivedWindowUpdateFrame(...)75%4.3473.33%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicPipeWriter.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Internal;
 4using System.Buffers;
 5using System.Diagnostics;
 6using System.IO.Pipelines;
 7
 8namespace IceRpc.Transports.Slic.Internal;
 9
 10// Type owns disposable field(s) '_completeWritesCts' and '_sendCreditSemaphore' but is not disposable
 11#pragma warning disable CA1001
 12internal class SlicPipeWriter : ReadOnlySequencePipeWriter
 13#pragma warning restore CA1001
 14{
 115    public override bool CanGetUnflushedBytes => true;
 16
 365317    public override long UnflushedBytes => _pipe.Writer.UnflushedBytes;
 18
 19    // We can avoid disposing _completeWritesCts because it was not created using CreateLinkedTokenSource, and it
 20    // doesn't use a timer. It is not easy to dispose it because CompleteWrites can be called by another thread after
 21    // Complete has been called.
 272422    private readonly CancellationTokenSource _completeWritesCts = new();
 23    private Exception? _exception;
 24    private bool _isCompleted;
 272425    private volatile int _peerWindowSize = SlicTransportOptions.MaxWindowSize;
 26    private readonly Pipe _pipe;
 27    // The semaphore is used when flow control is enabled to wait for additional send credit to be available.
 272428    private readonly SemaphoreSlim _sendCreditSemaphore = new(1, 1);
 29    private readonly SlicStream _stream;
 30
 31    public override void Advance(int bytes)
 981932    {
 981933        if (_isCompleted)
 034        {
 035            throw new InvalidOperationException("Writing is not allowed once the writer is completed.");
 36        }
 981937        _pipe.Writer.Advance(bytes);
 981938    }
 39
 40    // SlicPipeWriter does not support this method: the IceRPC core does not need it. And when the application code
 41    // installs a payload writer interceptor, this interceptor should never call it on "next".
 042    public override void CancelPendingFlush() => throw new NotSupportedException();
 43
 44    public override void Complete(Exception? exception = null)
 308445    {
 308446        if (!_isCompleted)
 269747        {
 269748            _isCompleted = true;
 49
 269750            if (exception is null && _pipe.Writer.UnflushedBytes > 0)
 151            {
 152                throw new InvalidOperationException(
 153                    $"Completing a {nameof(SlicPipeWriter)} without an exception is not allowed when this pipe writer ha
 54            }
 55
 56            // If the exception is set, forcefully close the stream writes if writes were not already gracefully closed
 57            // by WriteAsync called with endStream=true. Otherwise, if exception is null, writes are gracefully closed.
 269658            _stream.CloseWrites(graceful: exception is null);
 59
 269660            _pipe.Writer.Complete();
 269661            _pipe.Reader.Complete();
 62
 63            // Don't dispose the semaphore. It's not needed and we don't want to have to catch ObjectDisposedException
 64            // from AdjustPeerWindowSize if a StreamWindowUpdate is received after the application completed the stream
 65            // output. An alternative would be to add a lock but it's a bit overkill given than disposing the semaphore
 66            // is only useful when using SemaphoreSlim.AvailableWaitHandle.
 67            // _sendCreditSemaphore.Dispose();
 269668        }
 308369    }
 70
 71    public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken) =>
 72        // WriteAsync will flush the internal buffer
 42073        WriteAsync(ReadOnlySequence<byte>.Empty, endStream: false, cancellationToken);
 74
 275    public override Memory<byte> GetMemory(int sizeHint) => _pipe.Writer.GetMemory(sizeHint);
 76
 981977    public override Span<byte> GetSpan(int sizeHint) => _pipe.Writer.GetSpan(sizeHint);
 78
 79    public override ValueTask<FlushResult> WriteAsync(
 80        ReadOnlyMemory<byte> source,
 81        CancellationToken cancellationToken) =>
 564382        WriteAsync(new ReadOnlySequence<byte>(source), endStream: false, cancellationToken);
 83
 84    public override async ValueTask<FlushResult> WriteAsync(
 85        ReadOnlySequence<byte> source,
 86        bool endStream,
 87        CancellationToken cancellationToken)
 788388    {
 788389        if (_isCompleted)
 090        {
 091            throw new InvalidOperationException("Writing is not allowed once the writer is completed.");
 92        }
 93
 94        // Flush the pipe before the check for the close connection. This makes sure that the check for unflushed data
 95        // on successful compete succeeds. See the Complete implementation above.
 788396        if (_pipe.Writer.UnflushedBytes > 0)
 222097        {
 222098            await _pipe.Writer.FlushAsync(CancellationToken.None).ConfigureAwait(false);
 222099        }
 100
 7883101        _stream.ThrowIfConnectionClosed();
 102
 103        // Abort the stream if the invocation is canceled.
 7877104        using CancellationTokenRegistration cancelTokenRegistration = cancellationToken.UnsafeRegister(
 7105            cts => ((CancellationTokenSource)cts!).Cancel(),
 7877106            _completeWritesCts);
 107
 108        ReadOnlySequence<byte> source1;
 109        ReadOnlySequence<byte> source2;
 7877110        if (_pipe.Reader.TryRead(out ReadResult readResult))
 2219111        {
 2219112            Debug.Assert(!readResult.IsCanceled && !readResult.IsCompleted && readResult.Buffer.Length > 0);
 2219113            source1 = readResult.Buffer;
 2219114            source2 = source;
 2219115        }
 116        else
 5658117        {
 5658118            source1 = source;
 5658119            source2 = ReadOnlySequence<byte>.Empty;
 5658120        }
 121
 7877122        if (source1.IsEmpty && source2.IsEmpty && !endStream)
 8123        {
 124            // WriteAsync is called with an empty buffer, typically by a call to FlushAsync. Some payload writers such
 125            // as the deflate compressor might do this.
 8126            return new FlushResult(isCanceled: false, isCompleted: false);
 127        }
 128
 129        try
 7869130        {
 7869131            return await _stream.WriteStreamFrameAsync(
 7869132                source1,
 7869133                source2,
 7869134                endStream,
 7869135                _completeWritesCts.Token).ConfigureAwait(false);
 136        }
 1014137        catch (OperationCanceledException)
 1014138        {
 1014139            cancellationToken.ThrowIfCancellationRequested();
 1008140            return _exception is null ?
 1008141                new FlushResult(isCanceled: false, isCompleted: true) :
 1008142                throw ExceptionUtil.Throw(_exception);
 143        }
 144        finally
 7869145        {
 7869146            if (readResult.Buffer.Length > 0)
 2219147            {
 2219148                _pipe.Reader.AdvanceTo(readResult.Buffer.End);
 149
 150                // Make sure there's no more data to consume from the pipe.
 2219151                Debug.Assert(!_pipe.Reader.TryRead(out ReadResult _));
 2219152            }
 7869153        }
 7871154    }
 155
 2724156    internal SlicPipeWriter(SlicStream stream, SlicConnection connection)
 2724157    {
 2724158        _stream = stream;
 2724159        _peerWindowSize = connection.PeerInitialStreamWindowSize;
 160
 161        // Create a pipe that never pauses on flush or write. The SlicePipeWriter will pause the flush or write if
 162        // the Slic flow control doesn't permit sending more data.
 163        // The readerScheduler doesn't matter (we don't call _pipe.Reader.ReadAsync) and the writerScheduler doesn't
 164        // matter (_pipe.Writer.FlushAsync never blocks).
 2724165        _pipe = new(new PipeOptions(
 2724166            pool: connection.Pool,
 2724167            minimumSegmentSize: connection.MinSegmentSize,
 2724168            pauseWriterThreshold: 0,
 2724169            useSynchronizationContext: false));
 2724170    }
 171
 172    /// <summary>Acquires send credit.</summary>
 173    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 174    /// <returns>The available send credit.</returns>
 175    /// <remarks>The send credit matches the size of the peer's flow-control window.</remarks>
 176    internal async ValueTask<int> AcquireSendCreditAsync(CancellationToken cancellationToken)
 10178177    {
 178        // Acquire the semaphore to ensure flow control allows sending additional data. It's important to acquire the
 179        // semaphore before checking the peer window size. The semaphore acquisition will block if we can't send
 180        // additional data (_peerWindowSize <= 0).
 10178181        await _sendCreditSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
 9164182        return _peerWindowSize;
 9164183    }
 184
 185    /// <summary>Complete writes.</summary>
 186    /// <param name="exception">The exception that will be raised by <see cref="PipeWriter.WriteAsync" /> or <see
 187    /// cref="FlushAsync" />.</param>
 188    internal void CompleteWrites(Exception? exception)
 1660189    {
 1660190        Interlocked.CompareExchange(ref _exception, exception, null);
 1660191        _completeWritesCts.Cancel();
 1660192    }
 193
 194    /// <summary>Notifies the writer of the amount of send credit consumed by the sending of a stream frame.</summary>
 195    /// <param name="size">The size of the stream frame.</param>
 196    internal void ConsumedSendCredit(int size)
 9164197    {
 9164198        Debug.Assert(_sendCreditSemaphore.CurrentCount == 0); // Can only be called with the semaphore acquired.
 199
 200        // Release the semaphore if the peer's window size is still superior to 0
 9164201        int newPeerWindowSize = Interlocked.Add(ref _peerWindowSize, -size);
 9164202        if (newPeerWindowSize > 0)
 7509203        {
 7509204            _sendCreditSemaphore.Release();
 7509205        }
 9164206    }
 207
 208    /// <summary>Notifies the writer of the reception of a <see cref="FrameType.StreamWindowUpdate" /> frame.</summary>
 209    /// <param name="size">The window size increment.</param>
 210    internal void ReceivedWindowUpdateFrame(int size)
 968211    {
 968212        Debug.Assert(size > 0);
 213
 968214        int newPeerWindowSize = Interlocked.Add(ref _peerWindowSize, size);
 968215        if (newPeerWindowSize > SlicTransportOptions.MaxWindowSize)
 0216        {
 0217            throw new IceRpcException(
 0218                IceRpcError.IceRpcError,
 0219                $"The window update is trying to increase the window size to a value larger than allowed.");
 220        }
 221
 968222        int previousPeerWindowSize = newPeerWindowSize - size;
 223
 224        // A zero peer window size indicates that the last write consumed all the send credit and as a result didn't
 225        // release the semaphore. We can now release the semaphore to allow another write to send data.
 968226        if (previousPeerWindowSize == 0)
 648227        {
 648228            Debug.Assert(_sendCreditSemaphore.CurrentCount == 0);
 648229            _sendCreditSemaphore.Release();
 648230        }
 968231    }
 232}