< Summary

Information
Class: IceRpc.Transports.Slic.Internal.SlicPipeWriter
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicPipeWriter.cs
Tag: 1856_27024993493
Line coverage
95%
Covered lines: 109
Uncovered lines: 5
Coverable lines: 114
Total lines: 234
Line coverage: 95.6%
Branch coverage
88%
Covered branches: 30
Total branches: 34
Branch coverage: 88.2%
Method coverage
93%
Covered methods: 14
Fully covered methods: 11
Total methods: 15
Method coverage: 93.3%
Full method coverage: 73.3%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_CanGetUnflushedBytes()100%11100%
get_UnflushedBytes()100%11100%
.ctor(...)100%11100%
Advance(...)50%2266.66%
CancelPendingFlush()100%210%
Complete(...)100%66100%
FlushAsync(...)100%11100%
GetMemory(...)100%11100%
GetSpan(...)100%11100%
WriteAsync(...)100%11100%
WriteAsync()85%202095.45%
AcquireSendCreditAsync()100%11100%
CompleteWrites(...)100%11100%
ConsumedSendCredit(...)100%22100%
ReceivedWindowUpdateFrame(...)100%44100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicPipeWriter.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Internal;
 4using System.Buffers;
 5using System.Diagnostics;
 6using System.IO.Pipelines;
 7
 8namespace IceRpc.Transports.Slic.Internal;
 9
 10// Type owns disposable field(s) '_completeWritesCts' and '_sendCreditSemaphore' but is not disposable
 11#pragma warning disable CA1001
 12internal class SlicPipeWriter : ReadOnlySequencePipeWriter
 13#pragma warning restore CA1001
 14{
 115    public override bool CanGetUnflushedBytes => true;
 16
 368117    public override long UnflushedBytes => _pipe.Writer.UnflushedBytes;
 18
 19    // We can avoid disposing _completeWritesCts because it was not created using CreateLinkedTokenSource, and it
 20    // doesn't use a timer. It is not easy to dispose it because CompleteWrites can be called by another thread after
 21    // Complete has been called.
 280222    private readonly CancellationTokenSource _completeWritesCts = new();
 23    private Exception? _exception;
 24    private bool _isCompleted;
 25    private volatile int _peerWindowSize;
 26    private readonly Pipe _pipe;
 27    // The semaphore is used when flow control is enabled to wait for additional send credit to be available.
 280228    private readonly SemaphoreSlim _sendCreditSemaphore = new(1, 1);
 29    private readonly SlicStream _stream;
 30
 31    public override void Advance(int bytes)
 996932    {
 996933        if (_isCompleted)
 034        {
 035            throw new InvalidOperationException("Writing is not allowed once the writer is completed.");
 36        }
 996937        _pipe.Writer.Advance(bytes);
 996938    }
 39
 40    // SlicPipeWriter does not support this method: the IceRPC core does not need it. And when the application code
 41    // installs a payload writer interceptor, this interceptor should never call it on "next".
 042    public override void CancelPendingFlush() => throw new NotSupportedException();
 43
 44    public override void Complete(Exception? exception = null)
 316345    {
 316346        if (!_isCompleted)
 277547        {
 277548            _isCompleted = true;
 49
 277550            if (exception is null && _pipe.Writer.UnflushedBytes > 0)
 151            {
 152                throw new InvalidOperationException(
 153                    $"Completing a {nameof(SlicPipeWriter)} without an exception is not allowed when this pipe writer ha
 54            }
 55
 56            // If the exception is set, forcefully close the stream writes if writes were not already gracefully closed
 57            // by WriteAsync called with endStream=true. Otherwise, if exception is null, writes are gracefully closed.
 277458            _stream.CloseWrites(graceful: exception is null);
 59
 277460            _pipe.Writer.Complete();
 277461            _pipe.Reader.Complete();
 62
 63            // Don't dispose the semaphore. It's not needed and we don't want to have to catch ObjectDisposedException
 64            // from AdjustPeerWindowSize if a StreamWindowUpdate is received after the application completed the stream
 65            // output. An alternative would be to add a lock but it's a bit overkill given than disposing the semaphore
 66            // is only useful when using SemaphoreSlim.AvailableWaitHandle.
 67            // _sendCreditSemaphore.Dispose();
 277468        }
 316269    }
 70
 71    public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken) =>
 72        // WriteAsync will flush the internal buffer
 45473        WriteAsync(ReadOnlySequence<byte>.Empty, endStream: false, cancellationToken);
 74
 275    public override Memory<byte> GetMemory(int sizeHint) => _pipe.Writer.GetMemory(sizeHint);
 76
 996977    public override Span<byte> GetSpan(int sizeHint) => _pipe.Writer.GetSpan(sizeHint);
 78
 79    public override ValueTask<FlushResult> WriteAsync(
 80        ReadOnlyMemory<byte> source,
 81        CancellationToken cancellationToken) =>
 566782        WriteAsync(new ReadOnlySequence<byte>(source), endStream: false, cancellationToken);
 83
 84    public override async ValueTask<FlushResult> WriteAsync(
 85        ReadOnlySequence<byte> source,
 86        bool endStream,
 87        CancellationToken cancellationToken)
 795588    {
 795589        if (_isCompleted)
 090        {
 091            throw new InvalidOperationException("Writing is not allowed once the writer is completed.");
 92        }
 93
 94        // Flush the pipe before the check for the close connection. This makes sure that the check for unflushed data
 95        // on successful compete succeeds. See the Complete implementation above.
 795596        if (_pipe.Writer.UnflushedBytes > 0)
 226897        {
 226898            await _pipe.Writer.FlushAsync(CancellationToken.None).ConfigureAwait(false);
 226899        }
 100
 7955101        _stream.ThrowIfConnectionClosed();
 102
 103        // Abort the stream if the invocation is canceled.
 7950104        using CancellationTokenRegistration cancelTokenRegistration = cancellationToken.UnsafeRegister(
 7105            cts => ((CancellationTokenSource)cts!).Cancel(),
 7950106            _completeWritesCts);
 107
 108        ReadOnlySequence<byte> source1;
 109        ReadOnlySequence<byte> source2;
 7950110        if (_pipe.Reader.TryRead(out ReadResult readResult))
 2268111        {
 2268112            Debug.Assert(!readResult.IsCanceled && !readResult.IsCompleted && readResult.Buffer.Length > 0);
 2268113            source1 = readResult.Buffer;
 2268114            source2 = source;
 2268115        }
 116        else
 5682117        {
 5682118            source1 = source;
 5682119            source2 = ReadOnlySequence<byte>.Empty;
 5682120        }
 121
 7950122        if (source1.IsEmpty && source2.IsEmpty && !endStream)
 8123        {
 124            // WriteAsync is called with an empty buffer, typically by a call to FlushAsync. Some payload writers such
 125            // as the deflate compressor might do this.
 8126            return new FlushResult(isCanceled: false, isCompleted: false);
 127        }
 128
 129        try
 7942130        {
 7942131            return await _stream.WriteStreamFrameAsync(
 7942132                source1,
 7942133                source2,
 7942134                endStream,
 7942135                _completeWritesCts.Token).ConfigureAwait(false);
 136        }
 1014137        catch (OperationCanceledException)
 1014138        {
 1014139            cancellationToken.ThrowIfCancellationRequested();
 1007140            return _exception is null ?
 1007141                new FlushResult(isCanceled: false, isCompleted: true) :
 1007142                throw ExceptionUtil.Throw(_exception);
 143        }
 144        finally
 7942145        {
 7942146            if (readResult.Buffer.Length > 0)
 2268147            {
 2268148                _pipe.Reader.AdvanceTo(readResult.Buffer.End);
 149
 150                // Make sure there's no more data to consume from the pipe.
 2268151                Debug.Assert(!_pipe.Reader.TryRead(out ReadResult _));
 2268152            }
 7942153        }
 7942154    }
 155
 2802156    internal SlicPipeWriter(SlicStream stream, SlicConnection connection)
 2802157    {
 2802158        _stream = stream;
 2802159        _peerWindowSize = connection.PeerInitialStreamWindowSize;
 160
 161        // Create a pipe that never pauses on flush or write. The SlicePipeWriter will pause the flush or write if
 162        // the Slic flow control doesn't permit sending more data.
 163        // The readerScheduler doesn't matter (we don't call _pipe.Reader.ReadAsync) and the writerScheduler doesn't
 164        // matter (_pipe.Writer.FlushAsync never blocks).
 2802165        _pipe = new(new PipeOptions(
 2802166            pool: connection.Pool,
 2802167            minimumSegmentSize: connection.MinSegmentSize,
 2802168            pauseWriterThreshold: 0,
 2802169            useSynchronizationContext: false));
 2802170    }
 171
 172    /// <summary>Acquires send credit.</summary>
 173    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 174    /// <returns>The available send credit.</returns>
 175    /// <remarks>The send credit matches the size of the peer's flow-control window.</remarks>
 176    internal async ValueTask<int> AcquireSendCreditAsync(CancellationToken cancellationToken)
 9223177    {
 178        // Acquire the semaphore to ensure flow control allows sending additional data. It's important to acquire the
 179        // semaphore before checking the peer window size. The semaphore acquisition will block if we can't send
 180        // additional data (_peerWindowSize <= 0).
 9223181        await _sendCreditSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
 9197182        return _peerWindowSize;
 9197183    }
 184
 185    /// <summary>Complete writes.</summary>
 186    /// <param name="exception">The exception that will be raised by <see cref="PipeWriter.WriteAsync" /> or <see
 187    /// cref="FlushAsync" />.</param>
 188    internal void CompleteWrites(Exception? exception)
 1704189    {
 1704190        Interlocked.CompareExchange(ref _exception, exception, null);
 1704191        _completeWritesCts.Cancel();
 1704192    }
 193
 194    /// <summary>Notifies the writer of the amount of send credit consumed by the sending of a stream frame.</summary>
 195    /// <param name="size">The size of the stream frame.</param>
 196    internal void ConsumedSendCredit(int size)
 8214197    {
 8214198        Debug.Assert(_sendCreditSemaphore.CurrentCount == 0); // Can only be called with the semaphore acquired.
 199
 200        // Release the semaphore if the peer's window size is still superior to 0
 8214201        int newPeerWindowSize = Interlocked.Add(ref _peerWindowSize, -size);
 8214202        if (newPeerWindowSize > 0)
 7917203        {
 7917204            _sendCreditSemaphore.Release();
 7917205        }
 8214206    }
 207
 208    /// <summary>Notifies the writer of the reception of a <see cref="FrameType.StreamWindowUpdate" /> frame.</summary>
 209    /// <param name="size">The window size increment.</param>
 210    internal void ReceivedWindowUpdateFrame(int size)
 1216211    {
 1216212        Debug.Assert(size > 0);
 213
 214        // With _peerWindowSize >= 0 and size > 0, the atomic sum is positive unless it overflows int.MaxValue
 215        // (MaxWindowSize), in which case it wraps to a non-positive value.
 1216216        int newPeerWindowSize = Interlocked.Add(ref _peerWindowSize, size);
 1216217        if (newPeerWindowSize <= 0)
 1218        {
 1219            throw new IceRpcException(
 1220                IceRpcError.IceRpcError,
 1221                $"The window update is trying to increase the window size to a value larger than allowed.");
 222        }
 223
 1215224        int previousPeerWindowSize = newPeerWindowSize - size;
 225
 226        // A zero peer window size indicates that the last write consumed all the send credit and as a result didn't
 227        // release the semaphore. We can now release the semaphore to allow another write to send data.
 1215228        if (previousPeerWindowSize == 0)
 289229        {
 289230            Debug.Assert(_sendCreditSemaphore.CurrentCount == 0);
 289231            _sendCreditSemaphore.Release();
 289232        }
 1215233    }
 234}