< Summary

Information
Class: IceRpc.Internal.PipeWriterExtensions
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Internal/PipeWriterExtensions.cs
Tag: 275_13775359185
Line coverage
87%
Covered lines: 64
Uncovered lines: 9
Coverable lines: 73
Total lines: 154
Line coverage: 87.6%
Branch coverage
76%
Covered branches: 23
Total branches: 30
Branch coverage: 76.6%
Method coverage
100%
Covered methods: 5
Total methods: 5
Method coverage: 100%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.cctor()100%11100%
CompleteOutput(...)100%22100%
CopyFromAsync()100%1414100%
WriteReadResultAsync()100%22100%
WriteAsync()41.66%18.721264%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Internal/PipeWriterExtensions.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Transports;
 4using System.Buffers;
 5using System.Diagnostics;
 6using System.IO.Pipelines;
 7
 8namespace IceRpc.Internal;
 9
 10internal static class PipeWriterExtensions
 11{
 212    private static readonly Exception _outputCompleteException = new();
 13
 14    /// <summary>Completes the output provided by a <see cref="IMultiplexedStream" />.</summary>
 15    /// <param name="output">The output (a pipe writer).</param>
 16    /// <param name="success">When <see langword="true" />, the output is completed with a <see langword="null" />
 17    /// exception. Otherwise, it's completed with an exception. The exception used does not matter since Output behaves
 18    /// the same when completed with any exception.</param>
 19    internal static void CompleteOutput(this PipeWriter output, bool success)
 373920    {
 373921        if (success)
 361122        {
 361123            output.Complete(null);
 361124        }
 25        else
 12826        {
 12827            output.Complete(_outputCompleteException);
 12828        }
 373929    }
 30
 31    /// <summary>Copies the contents of a <see cref="PipeReader"/> into this <see cref="PipeWriter" />.</summary>
 32    /// <param name="writer">This pipe writer.</param>
 33    /// <param name="reader">The pipe reader to copy. This method does not complete it.</param>
 34    /// <param name="writesClosed">A task that completes when the writer can no longer write.</param>
 35    /// <param name="endStream">When <see langword="true" />, no more data will be written to the writer after the
 36    /// contents of the pipe reader.</param>
 37    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 38    /// <returns>The flush result. <see cref="FlushResult.IsCanceled" /> is <langword name="true"/> when the copying is
 39    /// interrupted by a call to <see cref="PipeReader.CancelPendingRead" /> on <paramref name="reader" />.</returns>
 40    internal static async ValueTask<FlushResult> CopyFromAsync(
 41        this PipeWriter writer,
 42        PipeReader reader,
 43        Task writesClosed,
 44        bool endStream,
 45        CancellationToken cancellationToken)
 356646    {
 47        FlushResult flushResult;
 356648        if (reader.TryRead(out ReadResult readResult))
 353449        {
 50            // We optimize for the very common situation where the all the reader bytes are available.
 353451            flushResult = await WriteReadResultAsync().ConfigureAwait(false);
 52
 352153            if (readResult.IsCompleted || flushResult.IsCanceled || flushResult.IsCompleted)
 350654            {
 350655                return flushResult;
 56            }
 1557        }
 58
 59        // If the peer is no longer reading, we want to cancel the reading of the payload.
 3560        CancellationToken readToken = writesClosed.AsCancellationToken(cancellationToken);
 61
 62        do
 4363        {
 64            try
 4365            {
 4366                readResult = await reader.ReadAsync(readToken).ConfigureAwait(false);
 2267            }
 1768            catch (OperationCanceledException exception) when (exception.CancellationToken == readToken)
 1369            {
 1370                cancellationToken.ThrowIfCancellationRequested();
 771                Debug.Assert(writesClosed.IsCompleted);
 72
 73                // This FlushAsync either throws an exception because the writer failed, or returns a completed
 74                // FlushResult.
 775                return await writer.FlushAsync(CancellationToken.None).ConfigureAwait(false);
 76            }
 77            // we let other exceptions thrown by ReadAsync (including possibly an OperationCanceledException
 78            // thrown incorrectly) escape.
 79
 2280            flushResult = await WriteReadResultAsync().ConfigureAwait(false);
 2281        }
 2282        while (!readResult.IsCompleted && !flushResult.IsCanceled && !flushResult.IsCompleted);
 83
 1484        return flushResult;
 85
 86        async ValueTask<FlushResult> WriteReadResultAsync()
 355687        {
 355688            if (readResult.IsCanceled)
 489            {
 90                // The application (or an interceptor/middleware) called CancelPendingRead on reader.
 491                reader.AdvanceTo(readResult.Buffer.Start); // Did not consume any byte in reader.
 92
 93                // The copy was canceled.
 494                return new FlushResult(isCanceled: true, isCompleted: true);
 95            }
 96            else
 355297            {
 98                try
 355299                {
 3552100                    return await writer.WriteAsync(
 3552101                        readResult.Buffer,
 3552102                        readResult.IsCompleted && endStream,
 3552103                        cancellationToken).ConfigureAwait(false);
 104                }
 105                finally
 3552106                {
 3552107                    reader.AdvanceTo(readResult.Buffer.End);
 3552108                }
 109            }
 3543110        }
 3525111    }
 112
 113    /// <summary>Writes a read only sequence of bytes to this writer.</summary>
 114    /// <param name="writer">The pipe writer.</param>
 115    /// <param name="source">The source sequence.</param>
 116    /// <param name="endStream">When <see langword="true" />, no more data will be written to the writer.</param>
 117    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 118    /// <returns>The flush result.</returns>
 119    private static async ValueTask<FlushResult> WriteAsync(
 120        this PipeWriter writer,
 121        ReadOnlySequence<byte> source,
 122        bool endStream,
 123        CancellationToken cancellationToken)
 3552124    {
 3552125        if (writer is ReadOnlySequencePipeWriter readOnlySequenceWriter)
 3536126        {
 3536127            return await readOnlySequenceWriter.WriteAsync(source, endStream, cancellationToken).ConfigureAwait(false);
 128        }
 129        else
 16130        {
 16131            FlushResult flushResult = default;
 16132            if (source.IsEmpty)
 8133            {
 8134                flushResult = await writer.FlushAsync(cancellationToken).ConfigureAwait(false);
 6135            }
 8136            else if (source.IsSingleSegment)
 8137            {
 8138                flushResult = await writer.WriteAsync(source.First, cancellationToken).ConfigureAwait(false);
 8139            }
 140            else
 0141            {
 0142                foreach (ReadOnlyMemory<byte> buffer in source)
 0143                {
 0144                    flushResult = await writer.WriteAsync(buffer, cancellationToken).ConfigureAwait(false);
 0145                    if (flushResult.IsCompleted || flushResult.IsCanceled)
 0146                    {
 0147                        break;
 148                    }
 0149                }
 0150            }
 14151            return flushResult;
 152        }
 3539153    }
 154}