< Summary

Information
Class: IceRpc.Transports.Quic.Internal.QuicPipeWriter
Assembly: IceRpc.Transports.Quic
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Transports.Quic/Internal/QuicPipeWriter.cs
Tag: 275_13775359185
Line coverage
95%
Covered lines: 122
Uncovered lines: 6
Coverable lines: 128
Total lines: 234
Line coverage: 95.3%
Branch coverage
84%
Covered branches: 32
Total branches: 38
Branch coverage: 84.2%
Method coverage
92%
Covered methods: 13
Total methods: 14
Method coverage: 92.8%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_CanGetUnflushedBytes()100%11100%
get_UnflushedBytes()100%11100%
get_Closed()100%11100%
Advance(...)100%11100%
CancelPendingFlush()100%210%
Complete(...)100%88100%
FlushAsync(...)100%11100%
GetMemory(...)100%11100%
GetSpan(...)100%11100%
WriteAsync(...)100%11100%
WriteAsync()79.16%24.432490.9%
WriteSequenceAsync()83.33%66100%
.ctor(...)100%11100%
ClosedAsync()100%11100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Transports.Quic/Internal/QuicPipeWriter.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using System.Buffers;
 4using System.Diagnostics;
 5using System.IO.Pipelines;
 6using System.Net.Quic;
 7using System.Net.Sockets;
 8
 9namespace IceRpc.Transports.Quic.Internal;
 10
 11internal class QuicPipeWriter : ReadOnlySequencePipeWriter
 12{
 213    public override bool CanGetUnflushedBytes => true;
 14
 215    public override long UnflushedBytes => _pipe.Writer.UnflushedBytes;
 16
 2417    internal Task Closed { get; }
 18
 19    private bool _isCompleted;
 20    private readonly Action _completeCallback;
 21    private readonly int _minSegmentSize;
 22
 23    // We use a helper Pipe instead of a StreamPipeWriter over _stream because StreamPipeWriter does not provide a
 24    // WriteAsync with an endStream/completeWrites parameter while QuicStream does.
 25    private readonly Pipe _pipe;
 26    private readonly QuicStream _stream;
 27
 28    private readonly Action _throwIfConnectionClosedOrDisposed;
 29
 1430    public override void Advance(int bytes) => _pipe.Writer.Advance(bytes);
 31
 32    // QuicPipeWriter does not support this method: the IceRPC core does not need it. And when the application code
 33    // installs a payload writer interceptor, this interceptor should never call it on "next".
 034    public override void CancelPendingFlush() => throw new NotSupportedException();
 35
 36    public override void Complete(Exception? exception = null)
 160837    {
 160838        if (!_isCompleted)
 100439        {
 100440            if (exception is null && _pipe.Writer.UnflushedBytes > 0)
 241            {
 242                throw new InvalidOperationException(
 243                    $"Completing a {nameof(QuicPipeWriter)} without an exception is not allowed when this pipe writer ha
 44            }
 45
 100246            _isCompleted = true;
 47
 100248            if (exception is null)
 99249            {
 50                // Unlike Slic, it's important to complete the writes and not abort the stream. Data might still be
 51                // buffered for send on the Quic stream and aborting the stream would discard this data.
 99252                _stream.CompleteWrites();
 99253            }
 54            else
 1055            {
 56                // We don't use the application error code, it's irrelevant.
 1057                _stream.Abort(QuicAbortDirection.Write, errorCode: 0);
 1058            }
 59
 100260            _completeCallback();
 61
 100262            _pipe.Writer.Complete();
 100263            _pipe.Reader.Complete();
 100264        }
 160665    }
 66
 67    public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken) =>
 68        // WriteAsync will flush the internal buffer
 469        WriteAsync(ReadOnlySequence<byte>.Empty, endStream: false, cancellationToken);
 70
 471    public override Memory<byte> GetMemory(int sizeHint) => _pipe.Writer.GetMemory(sizeHint);
 72
 1073    public override Span<byte> GetSpan(int sizeHint) => _pipe.Writer.GetSpan(sizeHint);
 74
 75    public override ValueTask<FlushResult> WriteAsync(
 76        ReadOnlyMemory<byte> source,
 77        CancellationToken cancellationToken) =>
 1119178        WriteAsync(new ReadOnlySequence<byte>(source), endStream: false, cancellationToken);
 79
 80    public override async ValueTask<FlushResult> WriteAsync(
 81        ReadOnlySequence<byte> source,
 82        bool endStream,
 83        CancellationToken cancellationToken)
 1120984    {
 1120985        if (_isCompleted)
 086        {
 087            throw new InvalidOperationException("Writing is not allowed once the writer is completed.");
 88        }
 89
 1120990        _throwIfConnectionClosedOrDisposed();
 91
 92        try
 1120593        {
 1120594            if (_pipe.Writer.UnflushedBytes > 0)
 1095            {
 1096                if (!source.IsEmpty && source.Length < _minSegmentSize)
 697                {
 98                    // When source fits in the last segment of _pipe.Writer, we copy it into _pipe.Writer.
 99
 6100                    Memory<byte> pipeMemory = _pipe.Writer.GetMemory();
 6101                    if (source.Length <= pipeMemory.Length)
 4102                    {
 4103                        source.CopyTo(pipeMemory.Span);
 4104                        _pipe.Writer.Advance((int)source.Length);
 4105                        source = ReadOnlySequence<byte>.Empty;
 4106                    }
 107                    else
 2108                    {
 2109                        _pipe.Writer.Advance(0);
 2110                    }
 6111                }
 112
 113                // Flush the internal pipe.
 10114                FlushResult flushResult = await _pipe.Writer.FlushAsync(CancellationToken.None).ConfigureAwait(false);
 10115                Debug.Assert(!flushResult.IsCanceled && !flushResult.IsCompleted);
 116
 117                // Read the data from the pipe.
 10118                bool tryReadOk = _pipe.Reader.TryRead(out ReadResult readResult);
 10119                Debug.Assert(tryReadOk);
 10120                Debug.Assert(!readResult.IsCanceled && !readResult.IsCompleted && readResult.Buffer.Length > 0);
 121
 122                try
 10123                {
 124                    // Write buffered data to the stream
 10125                    await WriteSequenceAsync(
 10126                        readResult.Buffer,
 10127                        completeWrites: endStream && source.IsEmpty,
 10128                        cancellationToken).ConfigureAwait(false);
 8129                }
 130                finally
 10131                {
 10132                    _pipe.Reader.AdvanceTo(readResult.Buffer.End);
 10133                }
 134
 8135                if (source.IsEmpty)
 6136                {
 137                    // We're done, we don't want to write again an empty sequence.
 6138                    return new FlushResult(isCanceled: false, isCompleted: endStream);
 139                }
 2140            }
 141
 11197142            if (source.IsEmpty && !endStream)
 4143            {
 144                // Nothing to do; this typically corresponds to a call to FlushAsync when there was no unflushed bytes.
 4145                return new FlushResult(isCanceled: false, isCompleted: false);
 146            }
 147            else
 11193148            {
 11193149                await WriteSequenceAsync(source, completeWrites: endStream, cancellationToken).ConfigureAwait(false);
 11181150                return new FlushResult(isCanceled: false, isCompleted: endStream);
 151            }
 152        }
 12153        catch (QuicException exception) when (exception.QuicError == QuicError.StreamAborted)
 8154        {
 8155            return new FlushResult(isCanceled: false, isCompleted: true);
 156        }
 4157        catch (QuicException exception)
 4158        {
 4159            throw exception.ToIceRpcException();
 160        }
 0161        catch (SocketException exception)
 0162        {
 0163            throw exception.ToIceRpcException();
 164        }
 165        // We don't wrap other exceptions
 166
 167        async ValueTask WriteSequenceAsync(
 168            ReadOnlySequence<byte> sequence,
 169            bool completeWrites,
 170            CancellationToken cancellationToken)
 11203171        {
 11203172            if (sequence.IsSingleSegment)
 11199173            {
 11199174                await _stream.WriteAsync(sequence.First, completeWrites, cancellationToken).ConfigureAwait(false);
 11185175            }
 176            else
 4177            {
 4178                ReadOnlySequence<byte>.Enumerator enumerator = sequence.GetEnumerator();
 4179                bool hasMore = enumerator.MoveNext();
 4180                Debug.Assert(hasMore);
 181                do
 10182                {
 10183                    ReadOnlyMemory<byte> buffer = enumerator.Current;
 10184                    hasMore = enumerator.MoveNext();
 10185                    await _stream.WriteAsync(buffer, completeWrites: completeWrites && !hasMore, cancellationToken)
 10186                        .ConfigureAwait(false);
 10187                }
 10188                while (hasMore);
 4189            }
 11189190        }
 11199191    }
 192
 1042193    internal QuicPipeWriter(
 1042194        QuicStream stream,
 1042195        MemoryPool<byte> pool,
 1042196        int minSegmentSize,
 1042197        Action completeCallback,
 1042198        Action throwIfConnectionClosedOrDisposed)
 1042199    {
 1042200        _stream = stream;
 1042201        _minSegmentSize = minSegmentSize;
 1042202        _completeCallback = completeCallback;
 203
 204        // This callback is used to check if the connection is closed or disposed before calling WriteAsync on the Quic
 205        // stream. This check works around the use of the QuicError.OperationAborted error code for both reporting the
 206        // abortion of the in-progress write call and for reporting a closed connection before the operation process
 207        // starts. In this latter case, we want to report ConnectionAborted.
 1042208        _throwIfConnectionClosedOrDisposed = throwIfConnectionClosedOrDisposed;
 209
 210        // Create a pipe that never pauses on flush or write. The Quic _stream.WriteAsync will block if the Quic flow
 211        // control doesn't permit sending more data.
 212        // The readerScheduler doesn't matter (we don't call _pipe.Reader.ReadAsync) and the writerScheduler doesn't
 213        // matter (_pipe.Writer.FlushAsync never blocks).
 1042214        _pipe = new(new PipeOptions(
 1042215            pool: pool,
 1042216            minimumSegmentSize: minSegmentSize,
 1042217            pauseWriterThreshold: 0,
 1042218            useSynchronizationContext: false));
 219
 1042220        Closed = ClosedAsync();
 221
 222        async Task ClosedAsync()
 1042223        {
 224            try
 1042225            {
 1042226                await _stream.WritesClosed.ConfigureAwait(false);
 816227            }
 226228            catch
 226229            {
 230                // Ignore failures.
 226231            }
 1042232        }
 1042233    }
 234}