< Summary

Information
Class: IceRpc.Transports.Slic.Internal.SlicPipeWriter
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicPipeWriter.cs
Tag: 1321_24790053727
Line coverage
92%
Covered lines: 106
Uncovered lines: 9
Coverable lines: 115
Total lines: 232
Line coverage: 92.1%
Branch coverage
82%
Covered branches: 28
Total branches: 34
Branch coverage: 82.3%
Method coverage
93%
Covered methods: 14
Fully covered methods: 10
Total methods: 15
Method coverage: 93.3%
Full method coverage: 66.6%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_CanGetUnflushedBytes()100%11100%
get_UnflushedBytes()100%11100%
.ctor(...)100%11100%
Advance(...)50%2266.66%
CancelPendingFlush()100%210%
Complete(...)100%66100%
FlushAsync(...)100%11100%
GetMemory(...)100%11100%
GetSpan(...)100%11100%
WriteAsync(...)100%11100%
WriteAsync()80%202095.45%
AcquireSendCreditAsync()100%11100%
CompleteWrites(...)100%11100%
ConsumedSendCredit(...)100%22100%
ReceivedWindowUpdateFrame(...)75%4473.33%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicPipeWriter.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Internal;
 4using System.Buffers;
 5using System.Diagnostics;
 6using System.IO.Pipelines;
 7
 8namespace IceRpc.Transports.Slic.Internal;
 9
 10// Type owns disposable field(s) '_completeWritesCts' and '_sendCreditSemaphore' but is not disposable
 11#pragma warning disable CA1001
 12internal class SlicPipeWriter : ReadOnlySequencePipeWriter
 13#pragma warning restore CA1001
 14{
 115    public override bool CanGetUnflushedBytes => true;
 16
 367117    public override long UnflushedBytes => _pipe.Writer.UnflushedBytes;
 18
 19    // We can avoid disposing _completeWritesCts because it was not created using CreateLinkedTokenSource, and it
 20    // doesn't use a timer. It is not easy to dispose it because CompleteWrites can be called by another thread after
 21    // Complete has been called.
 274022    private readonly CancellationTokenSource _completeWritesCts = new();
 23    private Exception? _exception;
 24    private bool _isCompleted;
 274025    private volatile int _peerWindowSize = SlicTransportOptions.MaxWindowSize;
 26    private readonly Pipe _pipe;
 27    // The semaphore is used when flow control is enabled to wait for additional send credit to be available.
 274028    private readonly SemaphoreSlim _sendCreditSemaphore = new(1, 1);
 29    private readonly SlicStream _stream;
 30
 31    public override void Advance(int bytes)
 988132    {
 988133        if (_isCompleted)
 034        {
 035            throw new InvalidOperationException("Writing is not allowed once the writer is completed.");
 36        }
 988137        _pipe.Writer.Advance(bytes);
 988138    }
 39
 40    // SlicPipeWriter does not support this method: the IceRPC core does not need it. And when the application code
 41    // installs a payload writer interceptor, this interceptor should never call it on "next".
 042    public override void CancelPendingFlush() => throw new NotSupportedException();
 43
 44    public override void Complete(Exception? exception = null)
 310245    {
 310246        if (!_isCompleted)
 271347        {
 271348            _isCompleted = true;
 49
 271350            if (exception is null && _pipe.Writer.UnflushedBytes > 0)
 151            {
 152                throw new InvalidOperationException(
 153                    $"Completing a {nameof(SlicPipeWriter)} without an exception is not allowed when this pipe writer ha
 54            }
 55
 56            // If the exception is set, forcefully close the stream writes if writes were not already gracefully closed
 57            // by WriteAsync called with endStream=true. Otherwise, if exception is null, writes are gracefully closed.
 271258            _stream.CloseWrites(graceful: exception is null);
 59
 271260            _pipe.Writer.Complete();
 271261            _pipe.Reader.Complete();
 62
 63            // Don't dispose the semaphore. It's not needed and we don't want to have to catch ObjectDisposedException
 64            // from AdjustPeerWindowSize if a StreamWindowUpdate is received after the application completed the stream
 65            // output. An alternative would be to add a lock but it's a bit overkill given than disposing the semaphore
 66            // is only useful when using SemaphoreSlim.AvailableWaitHandle.
 67            // _sendCreditSemaphore.Dispose();
 271268        }
 310169    }
 70
 71    public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken) =>
 72        // WriteAsync will flush the internal buffer
 42973        WriteAsync(ReadOnlySequence<byte>.Empty, endStream: false, cancellationToken);
 74
 275    public override Memory<byte> GetMemory(int sizeHint) => _pipe.Writer.GetMemory(sizeHint);
 76
 988177    public override Span<byte> GetSpan(int sizeHint) => _pipe.Writer.GetSpan(sizeHint);
 78
 79    public override ValueTask<FlushResult> WriteAsync(
 80        ReadOnlyMemory<byte> source,
 81        CancellationToken cancellationToken) =>
 564382        WriteAsync(new ReadOnlySequence<byte>(source), endStream: false, cancellationToken);
 83
 84    public override async ValueTask<FlushResult> WriteAsync(
 85        ReadOnlySequence<byte> source,
 86        bool endStream,
 87        CancellationToken cancellationToken)
 790188    {
 790189        if (_isCompleted)
 090        {
 091            throw new InvalidOperationException("Writing is not allowed once the writer is completed.");
 92        }
 93
 94        // Flush the pipe before the check for the close connection. This makes sure that the check for unflushed data
 95        // on successful compete succeeds. See the Complete implementation above.
 790196        if (_pipe.Writer.UnflushedBytes > 0)
 223997        {
 223998            await _pipe.Writer.FlushAsync(CancellationToken.None).ConfigureAwait(false);
 223999        }
 100
 7901101        _stream.ThrowIfConnectionClosed();
 102
 103        // Abort the stream if the invocation is canceled.
 7896104        using CancellationTokenRegistration cancelTokenRegistration = cancellationToken.UnsafeRegister(
 4105            cts => ((CancellationTokenSource)cts!).Cancel(),
 7896106            _completeWritesCts);
 107
 108        ReadOnlySequence<byte> source1;
 109        ReadOnlySequence<byte> source2;
 7896110        if (_pipe.Reader.TryRead(out ReadResult readResult))
 2239111        {
 2239112            Debug.Assert(!readResult.IsCanceled && !readResult.IsCompleted && readResult.Buffer.Length > 0);
 2239113            source1 = readResult.Buffer;
 2239114            source2 = source;
 2239115        }
 116        else
 5657117        {
 5657118            source1 = source;
 5657119            source2 = ReadOnlySequence<byte>.Empty;
 5657120        }
 121
 7896122        if (source1.IsEmpty && source2.IsEmpty && !endStream)
 7123        {
 124            // WriteAsync is called with an empty buffer, typically by a call to FlushAsync. Some payload writers such
 125            // as the deflate compressor might do this.
 7126            return new FlushResult(isCanceled: false, isCompleted: false);
 127        }
 128
 129        try
 7889130        {
 7889131            return await _stream.WriteStreamFrameAsync(
 7889132                source1,
 7889133                source2,
 7889134                endStream,
 7889135                _completeWritesCts.Token).ConfigureAwait(false);
 136        }
 1013137        catch (OperationCanceledException)
 1013138        {
 1013139            cancellationToken.ThrowIfCancellationRequested();
 1009140            return _exception is null ?
 1009141                new FlushResult(isCanceled: false, isCompleted: true) :
 1009142                throw ExceptionUtil.Throw(_exception);
 143        }
 144        finally
 7889145        {
 7889146            if (readResult.Buffer.Length > 0)
 2239147            {
 2239148                _pipe.Reader.AdvanceTo(readResult.Buffer.End);
 149
 150                // Make sure there's no more data to consume from the pipe.
 2239151                Debug.Assert(!_pipe.Reader.TryRead(out ReadResult _));
 2239152            }
 7889153        }
 7892154    }
 155
 2740156    internal SlicPipeWriter(SlicStream stream, SlicConnection connection)
 2740157    {
 2740158        _stream = stream;
 2740159        _peerWindowSize = connection.PeerInitialStreamWindowSize;
 160
 161        // Create a pipe that never pauses on flush or write. The SlicePipeWriter will pause the flush or write if
 162        // the Slic flow control doesn't permit sending more data.
 163        // The readerScheduler doesn't matter (we don't call _pipe.Reader.ReadAsync) and the writerScheduler doesn't
 164        // matter (_pipe.Writer.FlushAsync never blocks).
 2740165        _pipe = new(new PipeOptions(
 2740166            pool: connection.Pool,
 2740167            minimumSegmentSize: connection.MinSegmentSize,
 2740168            pauseWriterThreshold: 0,
 2740169            useSynchronizationContext: false));
 2740170    }
 171
 172    /// <summary>Acquires send credit.</summary>
 173    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 174    /// <returns>The available send credit.</returns>
 175    /// <remarks>The send credit matches the size of the peer's flow-control window.</remarks>
 176    internal async ValueTask<int> AcquireSendCreditAsync(CancellationToken cancellationToken)
 10160177    {
 178        // Acquire the semaphore to ensure flow control allows sending additional data. It's important to acquire the
 179        // semaphore before checking the peer window size. The semaphore acquisition will block if we can't send
 180        // additional data (_peerWindowSize <= 0).
 10160181        await _sendCreditSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
 9147182        return _peerWindowSize;
 9147183    }
 184
 185    /// <summary>Complete writes.</summary>
 186    /// <param name="exception">The exception that will be raised by <see cref="PipeWriter.WriteAsync" /> or <see
 187    /// cref="FlushAsync" />.</param>
 188    internal void CompleteWrites(Exception? exception)
 1674189    {
 1674190        Interlocked.CompareExchange(ref _exception, exception, null);
 1674191        _completeWritesCts.Cancel();
 1674192    }
 193
 194    /// <summary>Notifies the writer of the amount of send credit consumed by the sending of a stream frame.</summary>
 195    /// <param name="size">The size of the stream frame.</param>
 196    internal void ConsumedSendCredit(int size)
 9147197    {
 9147198        Debug.Assert(_sendCreditSemaphore.CurrentCount == 0); // Can only be called with the semaphore acquired.
 199
 200        // Release the semaphore if the peer's window size is still superior to 0
 9147201        int newPeerWindowSize = Interlocked.Add(ref _peerWindowSize, -size);
 9147202        if (newPeerWindowSize > 0)
 7598203        {
 7598204            _sendCreditSemaphore.Release();
 7598205        }
 9147206    }
 207
 208    /// <summary>Notifies the writer of the reception of a <see cref="FrameType.StreamWindowUpdate" /> frame.</summary>
 209    /// <param name="size">The window size increment.</param>
 210    internal void ReceivedWindowUpdateFrame(int size)
 1123211    {
 1123212        Debug.Assert(size > 0);
 213
 1123214        int newPeerWindowSize = Interlocked.Add(ref _peerWindowSize, size);
 1123215        if (newPeerWindowSize > SlicTransportOptions.MaxWindowSize)
 0216        {
 0217            throw new IceRpcException(
 0218                IceRpcError.IceRpcError,
 0219                $"The window update is trying to increase the window size to a value larger than allowed.");
 220        }
 221
 1123222        int previousPeerWindowSize = newPeerWindowSize - size;
 223
 224        // A zero peer window size indicates that the last write consumed all the send credit and as a result didn't
 225        // release the semaphore. We can now release the semaphore to allow another write to send data.
 1123226        if (previousPeerWindowSize == 0)
 546227        {
 546228            Debug.Assert(_sendCreditSemaphore.CurrentCount == 0);
 546229            _sendCreditSemaphore.Release();
 546230        }
 1123231    }
 232}