< Summary

Information
Class: IceRpc.Internal.PipeWriterExtensions
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Internal/PipeWriterExtensions.cs
Tag: 701_22528036593
Line coverage
87%
Covered lines: 64
Uncovered lines: 9
Coverable lines: 73
Total lines: 154
Line coverage: 87.6%
Branch coverage
76%
Covered branches: 23
Total branches: 30
Branch coverage: 76.6%
Method coverage
100%
Covered methods: 5
Total methods: 5
Method coverage: 100%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.cctor()100%11100%
CompleteOutput(...)100%22100%
CopyFromAsync()100%1414100%
WriteReadResultAsync()100%22100%
WriteAsync()41.66%18.721264%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Internal/PipeWriterExtensions.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Transports;
 4using System.Buffers;
 5using System.Diagnostics;
 6using System.IO.Pipelines;
 7
 8namespace IceRpc.Internal;
 9
 10internal static class PipeWriterExtensions
 11{
 112    private static readonly Exception _outputCompleteException = new();
 13
 14    /// <summary>Completes the output provided by a <see cref="IMultiplexedStream" />.</summary>
 15    /// <param name="output">The output (a pipe writer).</param>
 16    /// <param name="success">When <see langword="true" />, the output is completed with a <see langword="null" />
 17    /// exception. Otherwise, it's completed with an exception. The exception used does not matter since Output behaves
 18    /// the same when completed with any exception.</param>
 19    internal static void CompleteOutput(this PipeWriter output, bool success)
 192620    {
 192621        if (success)
 186522        {
 186523            output.Complete(null);
 186524        }
 25        else
 6126        {
 6127            output.Complete(_outputCompleteException);
 6128        }
 192629    }
 30
 31    /// <summary>Copies the contents of a <see cref="PipeReader"/> into this <see cref="PipeWriter" />.</summary>
 32    /// <param name="writer">This pipe writer.</param>
 33    /// <param name="reader">The pipe reader to copy. This method does not complete it.</param>
 34    /// <param name="writesClosed">A task that completes when the writer can no longer write.</param>
 35    /// <param name="endStream">When <see langword="true" />, no more data will be written to the writer after the
 36    /// contents of the pipe reader.</param>
 37    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 38    /// <returns>The flush result. <see cref="FlushResult.IsCanceled" /> is <langword name="true"/> when the copying is
 39    /// interrupted by a call to <see cref="PipeReader.CancelPendingRead" /> on <paramref name="reader" />.</returns>
 40    internal static async ValueTask<FlushResult> CopyFromAsync(
 41        this PipeWriter writer,
 42        PipeReader reader,
 43        Task writesClosed,
 44        bool endStream,
 45        CancellationToken cancellationToken)
 183446    {
 47        FlushResult flushResult;
 183448        if (reader.TryRead(out ReadResult readResult))
 181849        {
 50            // We optimize for the very common situation where the all the reader bytes are available.
 181851            flushResult = await WriteReadResultAsync().ConfigureAwait(false);
 52
 181253            if (readResult.IsCompleted || flushResult.IsCanceled || flushResult.IsCompleted)
 180454            {
 180455                return flushResult;
 56            }
 857        }
 58
 59        // If the peer is no longer reading, we want to cancel the reading of the payload.
 1860        CancellationToken readToken = writesClosed.AsCancellationToken(cancellationToken);
 61
 62        do
 2263        {
 64            try
 2265            {
 2266                readResult = await reader.ReadAsync(readToken).ConfigureAwait(false);
 1167            }
 968            catch (OperationCanceledException exception) when (exception.CancellationToken == readToken)
 769            {
 770                cancellationToken.ThrowIfCancellationRequested();
 471                Debug.Assert(writesClosed.IsCompleted);
 72
 73                // This FlushAsync either throws an exception because the writer failed, or returns a completed
 74                // FlushResult.
 475                return await writer.FlushAsync(CancellationToken.None).ConfigureAwait(false);
 76            }
 77            // we let other exceptions thrown by ReadAsync (including possibly an OperationCanceledException
 78            // thrown incorrectly) escape.
 79
 1180            flushResult = await WriteReadResultAsync().ConfigureAwait(false);
 1181        }
 1182        while (!readResult.IsCompleted && !flushResult.IsCanceled && !flushResult.IsCompleted);
 83
 784        return flushResult;
 85
 86        async ValueTask<FlushResult> WriteReadResultAsync()
 182987        {
 182988            if (readResult.IsCanceled)
 289            {
 90                // The application (or an interceptor/middleware) called CancelPendingRead on reader.
 291                reader.AdvanceTo(readResult.Buffer.Start); // Did not consume any byte in reader.
 92
 93                // The copy was canceled.
 294                return new FlushResult(isCanceled: true, isCompleted: true);
 95            }
 96            else
 182797            {
 98                try
 182799                {
 1827100                    return await writer.WriteAsync(
 1827101                        readResult.Buffer,
 1827102                        readResult.IsCompleted && endStream,
 1827103                        cancellationToken).ConfigureAwait(false);
 104                }
 105                finally
 1827106                {
 1827107                    reader.AdvanceTo(readResult.Buffer.End);
 1827108                }
 109            }
 1823110        }
 1814111    }
 112
 113    /// <summary>Writes a read only sequence of bytes to this writer.</summary>
 114    /// <param name="writer">The pipe writer.</param>
 115    /// <param name="source">The source sequence.</param>
 116    /// <param name="endStream">When <see langword="true" />, no more data will be written to the writer.</param>
 117    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 118    /// <returns>The flush result.</returns>
 119    private static async ValueTask<FlushResult> WriteAsync(
 120        this PipeWriter writer,
 121        ReadOnlySequence<byte> source,
 122        bool endStream,
 123        CancellationToken cancellationToken)
 1827124    {
 1827125        if (writer is ReadOnlySequencePipeWriter readOnlySequenceWriter)
 1817126        {
 1817127            return await readOnlySequenceWriter.WriteAsync(source, endStream, cancellationToken).ConfigureAwait(false);
 128        }
 129        else
 10130        {
 10131            FlushResult flushResult = default;
 10132            if (source.IsEmpty)
 4133            {
 4134                flushResult = await writer.FlushAsync(cancellationToken).ConfigureAwait(false);
 3135            }
 6136            else if (source.IsSingleSegment)
 6137            {
 6138                flushResult = await writer.WriteAsync(source.First, cancellationToken).ConfigureAwait(false);
 6139            }
 140            else
 0141            {
 0142                foreach (ReadOnlyMemory<byte> buffer in source)
 0143                {
 0144                    flushResult = await writer.WriteAsync(buffer, cancellationToken).ConfigureAwait(false);
 0145                    if (flushResult.IsCompleted || flushResult.IsCanceled)
 0146                    {
 0147                        break;
 148                    }
 0149                }
 0150            }
 9151            return flushResult;
 152        }
 1821153    }
 154}