< Summary

Information
Class: IceRpc.Transports.Quic.Internal.QuicPipeWriter
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Quic/Internal/QuicPipeWriter.cs
Tag: 592_20856082467
Line coverage
95%
Covered lines: 122
Uncovered lines: 6
Coverable lines: 128
Total lines: 238
Line coverage: 95.3%
Branch coverage
84%
Covered branches: 32
Total branches: 38
Branch coverage: 84.2%
Method coverage
92%
Covered methods: 13
Total methods: 14
Method coverage: 92.8%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_CanGetUnflushedBytes()100%11100%
get_UnflushedBytes()100%11100%
get_Closed()100%11100%
Advance(...)100%11100%
CancelPendingFlush()100%210%
Complete(...)100%88100%
FlushAsync(...)100%11100%
GetMemory(...)100%11100%
GetSpan(...)100%11100%
WriteAsync(...)100%11100%
WriteAsync()79.16%24.432490.9%
WriteSequenceAsync()83.33%66100%
.ctor(...)100%11100%
ClosedAsync()100%11100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Quic/Internal/QuicPipeWriter.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using System.Buffers;
 4using System.Diagnostics;
 5using System.IO.Pipelines;
 6using System.Net.Quic;
 7using System.Net.Sockets;
 8using System.Runtime.Versioning;
 9
 10namespace IceRpc.Transports.Quic.Internal;
 11
 12[SupportedOSPlatform("linux")]
 13[SupportedOSPlatform("macos")]
 14[SupportedOSPlatform("windows")]
 15internal class QuicPipeWriter : ReadOnlySequencePipeWriter
 16{
 117    public override bool CanGetUnflushedBytes => true;
 18
 119    public override long UnflushedBytes => _pipe.Writer.UnflushedBytes;
 20
 1221    internal Task Closed { get; }
 22
 23    private bool _isCompleted;
 24    private readonly Action _completeCallback;
 25    private readonly int _minSegmentSize;
 26
 27    // We use a helper Pipe instead of a StreamPipeWriter over _stream because StreamPipeWriter does not provide a
 28    // WriteAsync with an endStream/completeWrites parameter while QuicStream does.
 29    private readonly Pipe _pipe;
 30    private readonly QuicStream _stream;
 31
 32    private readonly Action _throwIfConnectionClosedOrDisposed;
 33
 734    public override void Advance(int bytes) => _pipe.Writer.Advance(bytes);
 35
 36    // QuicPipeWriter does not support this method: the IceRPC core does not need it. And when the application code
 37    // installs a payload writer interceptor, this interceptor should never call it on "next".
 038    public override void CancelPendingFlush() => throw new NotSupportedException();
 39
 40    public override void Complete(Exception? exception = null)
 80541    {
 80542        if (!_isCompleted)
 50343        {
 50344            if (exception is null && _pipe.Writer.UnflushedBytes > 0)
 145            {
 146                throw new InvalidOperationException(
 147                    $"Completing a {nameof(QuicPipeWriter)} without an exception is not allowed when this pipe writer ha
 48            }
 49
 50250            _isCompleted = true;
 51
 50252            if (exception is null)
 49753            {
 54                // Unlike Slic, it's important to complete the writes and not abort the stream. Data might still be
 55                // buffered for send on the QUIC stream and aborting the stream would discard this data.
 49756                _stream.CompleteWrites();
 49757            }
 58            else
 559            {
 60                // We don't use the application error code, it's irrelevant.
 561                _stream.Abort(QuicAbortDirection.Write, errorCode: 0);
 562            }
 63
 50264            _completeCallback();
 65
 50266            _pipe.Writer.Complete();
 50267            _pipe.Reader.Complete();
 50268        }
 80469    }
 70
 71    public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken) =>
 72        // WriteAsync will flush the internal buffer
 273        WriteAsync(ReadOnlySequence<byte>.Empty, endStream: false, cancellationToken);
 74
 275    public override Memory<byte> GetMemory(int sizeHint) => _pipe.Writer.GetMemory(sizeHint);
 76
 577    public override Span<byte> GetSpan(int sizeHint) => _pipe.Writer.GetSpan(sizeHint);
 78
 79    public override ValueTask<FlushResult> WriteAsync(
 80        ReadOnlyMemory<byte> source,
 81        CancellationToken cancellationToken) =>
 559682        WriteAsync(new ReadOnlySequence<byte>(source), endStream: false, cancellationToken);
 83
 84    public override async ValueTask<FlushResult> WriteAsync(
 85        ReadOnlySequence<byte> source,
 86        bool endStream,
 87        CancellationToken cancellationToken)
 560588    {
 560589        if (_isCompleted)
 090        {
 091            throw new InvalidOperationException("Writing is not allowed once the writer is completed.");
 92        }
 93
 560594        _throwIfConnectionClosedOrDisposed();
 95
 96        try
 560397        {
 560398            if (_pipe.Writer.UnflushedBytes > 0)
 599            {
 5100                if (!source.IsEmpty && source.Length < _minSegmentSize)
 3101                {
 102                    // When source fits in the last segment of _pipe.Writer, we copy it into _pipe.Writer.
 103
 3104                    Memory<byte> pipeMemory = _pipe.Writer.GetMemory();
 3105                    if (source.Length <= pipeMemory.Length)
 2106                    {
 2107                        source.CopyTo(pipeMemory.Span);
 2108                        _pipe.Writer.Advance((int)source.Length);
 2109                        source = ReadOnlySequence<byte>.Empty;
 2110                    }
 111                    else
 1112                    {
 1113                        _pipe.Writer.Advance(0);
 1114                    }
 3115                }
 116
 117                // Flush the internal pipe.
 5118                FlushResult flushResult = await _pipe.Writer.FlushAsync(CancellationToken.None).ConfigureAwait(false);
 5119                Debug.Assert(!flushResult.IsCanceled && !flushResult.IsCompleted);
 120
 121                // Read the data from the pipe.
 5122                bool tryReadOk = _pipe.Reader.TryRead(out ReadResult readResult);
 5123                Debug.Assert(tryReadOk);
 5124                Debug.Assert(!readResult.IsCanceled && !readResult.IsCompleted && readResult.Buffer.Length > 0);
 125
 126                try
 5127                {
 128                    // Write buffered data to the stream
 5129                    await WriteSequenceAsync(
 5130                        readResult.Buffer,
 5131                        completeWrites: endStream && source.IsEmpty,
 5132                        cancellationToken).ConfigureAwait(false);
 4133                }
 134                finally
 5135                {
 5136                    _pipe.Reader.AdvanceTo(readResult.Buffer.End);
 5137                }
 138
 4139                if (source.IsEmpty)
 3140                {
 141                    // We're done, we don't want to write again an empty sequence.
 3142                    return new FlushResult(isCanceled: false, isCompleted: endStream);
 143                }
 1144            }
 145
 5599146            if (source.IsEmpty && !endStream)
 2147            {
 148                // Nothing to do; this typically corresponds to a call to FlushAsync when there was no unflushed bytes.
 2149                return new FlushResult(isCanceled: false, isCompleted: false);
 150            }
 151            else
 5597152            {
 5597153                await WriteSequenceAsync(source, completeWrites: endStream, cancellationToken).ConfigureAwait(false);
 5591154                return new FlushResult(isCanceled: false, isCompleted: endStream);
 155            }
 156        }
 6157        catch (QuicException exception) when (exception.QuicError == QuicError.StreamAborted)
 4158        {
 4159            return new FlushResult(isCanceled: false, isCompleted: true);
 160        }
 2161        catch (QuicException exception)
 2162        {
 2163            throw exception.ToIceRpcException();
 164        }
 0165        catch (SocketException exception)
 0166        {
 0167            throw exception.ToIceRpcException();
 168        }
 169        // We don't wrap other exceptions
 170
 171        async ValueTask WriteSequenceAsync(
 172            ReadOnlySequence<byte> sequence,
 173            bool completeWrites,
 174            CancellationToken cancellationToken)
 5602175        {
 5602176            if (sequence.IsSingleSegment)
 5600177            {
 5600178                await _stream.WriteAsync(sequence.First, completeWrites, cancellationToken).ConfigureAwait(false);
 5593179            }
 180            else
 2181            {
 2182                ReadOnlySequence<byte>.Enumerator enumerator = sequence.GetEnumerator();
 2183                bool hasMore = enumerator.MoveNext();
 2184                Debug.Assert(hasMore);
 185                do
 5186                {
 5187                    ReadOnlyMemory<byte> buffer = enumerator.Current;
 5188                    hasMore = enumerator.MoveNext();
 5189                    await _stream.WriteAsync(buffer, completeWrites: completeWrites && !hasMore, cancellationToken)
 5190                        .ConfigureAwait(false);
 5191                }
 5192                while (hasMore);
 2193            }
 5595194        }
 5600195    }
 196
 522197    internal QuicPipeWriter(
 522198        QuicStream stream,
 522199        MemoryPool<byte> pool,
 522200        int minSegmentSize,
 522201        Action completeCallback,
 522202        Action throwIfConnectionClosedOrDisposed)
 522203    {
 522204        _stream = stream;
 522205        _minSegmentSize = minSegmentSize;
 522206        _completeCallback = completeCallback;
 207
 208        // This callback is used to check if the connection is closed or disposed before calling WriteAsync on the QUIC
 209        // stream. This check works around the use of the QuicError.OperationAborted error code for both reporting the
 210        // abortion of the in-progress write call and for reporting a closed connection before the operation process
 211        // starts. In this latter case, we want to report ConnectionAborted.
 522212        _throwIfConnectionClosedOrDisposed = throwIfConnectionClosedOrDisposed;
 213
 214        // Create a pipe that never pauses on flush or write. The QUIC _stream.WriteAsync will block if the QUIC flow
 215        // control doesn't permit sending more data.
 216        // The readerScheduler doesn't matter (we don't call _pipe.Reader.ReadAsync) and the writerScheduler doesn't
 217        // matter (_pipe.Writer.FlushAsync never blocks).
 522218        _pipe = new(new PipeOptions(
 522219            pool: pool,
 522220            minimumSegmentSize: minSegmentSize,
 522221            pauseWriterThreshold: 0,
 522222            useSynchronizationContext: false));
 223
 522224        Closed = ClosedAsync();
 225
 226        async Task ClosedAsync()
 522227        {
 228            try
 522229            {
 522230                await _stream.WritesClosed.ConfigureAwait(false);
 418231            }
 104232            catch
 104233            {
 234                // Ignore failures.
 104235            }
 522236        }
 522237    }
 238}