< Summary

Information
Class: IceRpc.Protobuf.Internal.AsyncEnumerableExtensions
Assembly: IceRpc.Protobuf
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Protobuf/Internal/AsyncEnumerableExtensions.cs
Tag: 275_13775359185
Line coverage
94%
Covered lines: 107
Uncovered lines: 6
Coverable lines: 113
Total lines: 209
Line coverage: 94.6%
Branch coverage
89%
Covered branches: 25
Total branches: 28
Branch coverage: 89.2%
Method coverage
80%
Covered methods: 8
Total methods: 10
Method coverage: 80%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
ToPipeReader(...)100%11100%
.ctor(...)100%22100%
AdvanceTo(...)100%11100%
AdvanceTo(...)100%210%
CancelPendingRead()100%11100%
Complete(...)100%22100%
DisposeEnumeratorAsync()100%2291.66%
ReadAsync()85.71%14.081492.5%
EncodeElements()87.5%88100%
TryRead(...)100%210%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Protobuf/Internal/AsyncEnumerableExtensions.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using Google.Protobuf;
 4using System.Buffers;
 5using System.Buffers.Binary;
 6using System.Diagnostics;
 7using System.IO.Pipelines;
 8
 9namespace IceRpc.Protobuf.Internal;
 10
 11/// <summary>Provides an extension method for <see cref="IAsyncEnumerable{T}" /> to encode elements into a <see
 12/// cref="PipeReader"/>.</summary>
 13internal static class AsyncEnumerableExtensions
 14{
 15    /// <summary>Encodes an async enumerable into a stream of bytes represented by a <see cref="PipeReader"/>.</summary>
 16    /// <typeparam name="T">The async enumerable element type.</typeparam>
 17    /// <param name="asyncEnumerable">The async enumerable to encode into a stream of bytes.</param>
 18    /// <param name="encodeOptions">The Protobuf encode options.</param>
 19    /// <returns>A pipe reader that represents the encoded stream of bytes.</returns>
 20    /// <remarks>This extension method is used to encode streaming parameters and streaming return values.</remarks>
 21    internal static PipeReader ToPipeReader<T>(
 22        this IAsyncEnumerable<T> asyncEnumerable,
 23        ProtobufEncodeOptions? encodeOptions = null) where T : IMessage<T> =>
 2624        new AsyncEnumerablePipeReader<T>(asyncEnumerable, encodeOptions);
 25
 26    // Overriding ReadAtLeastAsyncCore or CopyToAsync methods for this reader is not critical since this reader is
 27    // mostly used by the IceRPC core to copy the encoded data for the enumerable to the network stream. This copy
 28    // doesn't use these methods.
 29#pragma warning disable CA1001 // Types that own disposable fields should be disposable.
 30    private class AsyncEnumerablePipeReader<T> : PipeReader where T : IMessage<T>
 31#pragma warning restore CA1001
 32    {
 33        // Disposed in Complete.
 34        private readonly IAsyncEnumerator<T> _asyncEnumerator;
 35
 36        // We don't dispose _cts because it's not necessary
 37        // (see https://github.com/dotnet/runtime/issues/29970#issuecomment-717840778) and we can't easily dispose it
 38        // when no one is using it since CancelPendingRead can be called by another thread after Complete is called.
 2639        private readonly CancellationTokenSource _cts = new();
 40        private bool _isCompleted;
 41        private readonly int _streamFlushThreshold;
 42        private Task<bool>? _moveNext;
 43        private readonly Pipe _pipe;
 44
 13144945        public override void AdvanceTo(SequencePosition consumed) => _pipe.Reader.AdvanceTo(consumed);
 46
 47        public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) =>
 048            _pipe.Reader.AdvanceTo(consumed, examined);
 49
 50        public override void CancelPendingRead()
 151        {
 152            _pipe.Reader.CancelPendingRead();
 153            _cts.Cancel();
 154        }
 55
 56        public override void Complete(Exception? exception = null)
 2657        {
 2658            if (!_isCompleted)
 2259            {
 2260                _isCompleted = true;
 61
 62                // Cancel MoveNextAsync if it's still running.
 2263                _cts.Cancel();
 64
 2265                _pipe.Reader.Complete();
 2266                _pipe.Writer.Complete();
 67
 2268                _ = DisposeEnumeratorAsync();
 2269            }
 70
 71            async Task DisposeEnumeratorAsync()
 2272            {
 73                // Make sure MoveNextAsync is completed before disposing the enumerator. Calling DisposeAsync on the
 74                // enumerator while MoveNextAsync is still running is disallowed.
 2275                if (_moveNext is not null)
 276                {
 77                    try
 278                    {
 279                        _ = await _moveNext.ConfigureAwait(false);
 080                    }
 281                    catch
 282                    {
 283                    }
 284                }
 2285                await _asyncEnumerator.DisposeAsync().ConfigureAwait(false);
 2286            }
 2687        }
 88
 89        public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
 13146390        {
 13146391            if (!_pipe.Reader.TryRead(out ReadResult readResult))
 14792            {
 93                // If no more buffered data to read, fill the pipe with new data.
 94
 95                // If ReadAsync is canceled, cancel the enumerator iteration to ensure MoveNextAsync below completes.
 14796                using CancellationTokenRegistration registration = cancellationToken.UnsafeRegister(
 197                    cts => ((CancellationTokenSource)cts!).Cancel(),
 14798                    _cts);
 99
 100                bool hasNext;
 101                try
 147102                {
 147103                    if (_moveNext is null)
 21104                    {
 21105                        hasNext = await _asyncEnumerator.MoveNextAsync().ConfigureAwait(false);
 19106                    }
 107                    else
 126108                    {
 126109                        hasNext = await _moveNext.ConfigureAwait(false);
 125110                        _moveNext = null;
 125111                    }
 112
 144113                    if (hasNext && EncodeElements() is Task<bool> moveNext)
 127114                    {
 115                        // Flush does not block because the pipe is configured to not pause flush.
 127116                        ValueTask<FlushResult> valueTask = _pipe.Writer.FlushAsync(CancellationToken.None);
 127117                        Debug.Assert(valueTask.IsCompletedSuccessfully);
 118
 127119                        _moveNext = moveNext;
 120                        // And the next ReadAsync will await _moveNext.
 127121                    }
 122                    else
 17123                    {
 124                        // No need to flush the writer, complete takes care of it.
 17125                        _pipe.Writer.Complete();
 17126                    }
 127
 128                    // There are bytes in the reader or it's completed since we've just flushed or completed the writer.
 144129                    bool ok = _pipe.Reader.TryRead(out readResult);
 144130                    Debug.Assert(ok);
 144131                }
 2132                catch (OperationCanceledException exception)
 2133                {
 2134                    Debug.Assert(exception.CancellationToken == _cts.Token);
 2135                    cancellationToken.ThrowIfCancellationRequested();
 136
 1137                    if (_pipe.Reader.TryRead(out readResult) && readResult.IsCanceled)
 1138                    {
 139                        // Ok: return canceled readResult once after calling CancelPendingRead.
 140                        // Note that we can't return a canceled read result with a bogus buffer since the caller must
 141                        // be able to call reader.AdvanceTo with this buffer.
 1142                    }
 143                    else
 0144                    {
 0145                        throw new NotSupportedException(
 0146                            "Cannot resume reading an AsyncEnumerablePipeReader after canceling a ReadAsync or calling C
 147                    }
 1148                }
 145149            }
 150
 131461151            return readResult;
 152
 153            Task<bool>? EncodeElements()
 143154            {
 143155                Task<bool>? result = null;
 156                bool keepEncoding;
 143157                int written = 0;
 158                do
 65778159                {
 65778160                    _pipe.Writer.Write(new Span<byte>([0])); // Not compressed
 65778161                    Span<byte> lengthPlaceholder = _pipe.Writer.GetSpan(4)[..4];
 65778162                    _pipe.Writer.Advance(4);
 65778163                    _asyncEnumerator.Current.WriteTo(_pipe.Writer);
 65778164                    int length = checked((int)_pipe.Writer.UnflushedBytes - written);
 65778165                    written += length;
 65778166                    BinaryPrimitives.WriteInt32BigEndian(lengthPlaceholder, length - 5);
 65778167                    ValueTask<bool> moveNext = _asyncEnumerator.MoveNextAsync();
 168
 65778169                    if (moveNext.IsCompletedSuccessfully)
 65745170                    {
 65745171                        bool hasNext = moveNext.Result;
 172
 173                        // If we reached the stream flush threshold, it's time to flush.
 65745174                        if (written >= _streamFlushThreshold)
 94175                        {
 94176                            result = hasNext ? moveNext.AsTask() : null;
 94177                            keepEncoding = false;
 94178                        }
 179                        else
 65651180                        {
 65651181                            keepEncoding = hasNext;
 65651182                        }
 65745183                    }
 184                    else
 33185                    {
 186                        // If we can't get the next element synchronously, we return the move next task and end the loop
 187                        // to flush the encoded elements.
 33188                        result = moveNext.AsTask();
 33189                        keepEncoding = false;
 33190                    }
 65778191                }
 65778192                while (keepEncoding);
 143193                return result;
 143194            }
 131461195        }
 196
 0197        public override bool TryRead(out ReadResult result) => _pipe.Reader.TryRead(out result);
 198
 26199        internal AsyncEnumerablePipeReader(
 26200            IAsyncEnumerable<T> asyncEnumerable,
 26201            ProtobufEncodeOptions? encodeOptions)
 26202        {
 26203            encodeOptions ??= ProtobufEncodeOptions.Default;
 26204            _pipe = new Pipe(encodeOptions.PipeOptions);
 26205            _streamFlushThreshold = encodeOptions.StreamFlushThreshold;
 26206            _asyncEnumerator = asyncEnumerable.GetAsyncEnumerator(_cts.Token);
 26207        }
 208    }
 209}