< Summary

Information
Class: IceRpc.Slice.Operations.AsyncEnumerableExtensions
Assembly: IceRpc.Slice
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Slice/Operations/AsyncEnumerableExtensions.cs
Tag: 1321_24790053727
Line coverage
95%
Covered lines: 120
Uncovered lines: 6
Coverable lines: 126
Total lines: 230
Line coverage: 95.2%
Branch coverage
93%
Covered branches: 30
Total branches: 32
Branch coverage: 93.7%
Method coverage
80%
Covered methods: 8
Fully covered methods: 4
Total methods: 10
Method coverage: 80%
Full method coverage: 40%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
ToPipeReader(...)100%11100%
.ctor(...)100%22100%
AdvanceTo(...)100%11100%
AdvanceTo(...)100%210%
CancelPendingRead()100%11100%
Complete(...)100%22100%
DisposeEnumeratorAsync()100%2291.66%
ReadAsync()85.71%141492.5%
EncodeElements()100%1212100%
TryRead(...)100%210%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Slice/Operations/AsyncEnumerableExtensions.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using System.Diagnostics;
 4using System.IO.Pipelines;
 5using ZeroC.Slice.Codec;
 6
 7namespace IceRpc.Slice.Operations;
 8
 9/// <summary>Provides an extension method for <see cref="IAsyncEnumerable{T}" /> to encode elements into a <see
 10/// cref="PipeReader"/>.</summary>
 11public static class AsyncEnumerableExtensions
 12{
 13    /// <summary>Encodes an async enumerable into a stream of bytes represented by a <see cref="PipeReader"/>.</summary>
 14    /// <typeparam name="T">The async enumerable element type.</typeparam>
 15    /// <param name="asyncEnumerable">The async enumerable to encode into a stream of bytes.</param>
 16    /// <param name="encodeAction">The action used to encode one element.</param>
 17    /// <param name="useSegments"><see langword="true" /> if an element can be encoded on a variable number of bytes;
 18    /// otherwise, <see langword="false" />.</param>
 19    /// <param name="encodeOptions">The Slice encode options.</param>
 20    /// <returns>A pipe reader that represents the encoded stream of bytes.</returns>
 21    public static PipeReader ToPipeReader<T>(
 22        this IAsyncEnumerable<T> asyncEnumerable,
 23        EncodeAction<T> encodeAction,
 24        bool useSegments,
 25        SliceEncodeOptions? encodeOptions = null) =>
 2526        new AsyncEnumerablePipeReader<T>(
 2527            asyncEnumerable,
 2528            encodeAction,
 2529            useSegments,
 2530            encodeOptions);
 31
 32    // Overriding ReadAtLeastAsyncCore or CopyToAsync methods for this reader is not critical since this reader is
 33    // mostly used by the IceRPC core to copy the encoded data for the enumerable to the network stream. This copy
 34    // doesn't use these methods.
 35#pragma warning disable CA1001 // Types that own disposable fields should be disposable.
 36    private class AsyncEnumerablePipeReader<T> : PipeReader
 37#pragma warning restore CA1001
 38    {
 39        // Disposed in Complete.
 40        private readonly IAsyncEnumerator<T> _asyncEnumerator;
 41
 42        // We don't dispose _cts because it's not necessary
 43        // (see https://github.com/dotnet/runtime/issues/29970#issuecomment-717840778) and we can't easily dispose it
 44        // when no one is using it since CancelPendingRead can be called by another thread after Complete is called.
 2545        private readonly CancellationTokenSource _cts = new();
 46        private readonly EncodeAction<T> _encodeAction;
 47        private bool _isCompleted;
 48        private readonly bool _useSegments;
 49        private readonly int _streamFlushThreshold;
 50        private Task<bool>? _moveNext;
 51        private readonly Pipe _pipe;
 52
 11253        public override void AdvanceTo(SequencePosition consumed) => _pipe.Reader.AdvanceTo(consumed);
 54
 55        public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) =>
 056            _pipe.Reader.AdvanceTo(consumed, examined);
 57
 58        public override void CancelPendingRead()
 159        {
 160            _pipe.Reader.CancelPendingRead();
 161            _cts.Cancel();
 162        }
 63
 64        public override void Complete(Exception? exception = null)
 2665        {
 2666            if (!_isCompleted)
 1967            {
 1968                _isCompleted = true;
 69
 70                // Cancel MoveNextAsync if it's still running.
 1971                _cts.Cancel();
 72
 1973                _pipe.Reader.Complete();
 1974                _pipe.Writer.Complete();
 75
 1976                _ = DisposeEnumeratorAsync();
 1977            }
 78
 79            async Task DisposeEnumeratorAsync()
 1980            {
 81                // Make sure MoveNextAsync is completed before disposing the enumerator. Calling DisposeAsync on the
 82                // enumerator while MoveNextAsync is still running is disallowed.
 1983                if (_moveNext is not null)
 284                {
 85                    try
 286                    {
 287                        _ = await _moveNext.ConfigureAwait(false);
 088                    }
 289                    catch
 290                    {
 291                    }
 292                }
 1993                await _asyncEnumerator.DisposeAsync().ConfigureAwait(false);
 1994            }
 2695        }
 96
 97        public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
 11598        {
 11599            if (!_pipe.Reader.TryRead(out ReadResult readResult))
 115100            {
 101                // If no more buffered data to read, fill the pipe with new data.
 102
 103                // If ReadAsync is canceled, cancel the enumerator iteration to ensure MoveNextAsync below completes.
 115104                using CancellationTokenRegistration registration = cancellationToken.UnsafeRegister(
 1105                    cts => ((CancellationTokenSource)cts!).Cancel(),
 115106                    _cts);
 107
 108                bool hasNext;
 109                try
 115110                {
 115111                    if (_moveNext is null)
 24112                    {
 24113                        hasNext = await _asyncEnumerator.MoveNextAsync().ConfigureAwait(false);
 22114                    }
 115                    else
 91116                    {
 91117                        hasNext = await _moveNext.ConfigureAwait(false);
 90118                        _moveNext = null;
 90119                    }
 120
 112121                    if (hasNext && EncodeElements() is Task<bool> moveNext)
 92122                    {
 123                        // Flush does not block because the pipe is configured to not pause flush.
 92124                        ValueTask<FlushResult> valueTask = _pipe.Writer.FlushAsync(CancellationToken.None);
 92125                        Debug.Assert(valueTask.IsCompletedSuccessfully);
 126
 92127                        _moveNext = moveNext;
 128                        // And the next ReadAsync will await _moveNext.
 92129                    }
 130                    else
 20131                    {
 132                        // No need to flush the writer, complete takes care of it.
 20133                        _pipe.Writer.Complete();
 20134                    }
 135
 136                    // There are bytes in the reader or it's completed since we've just flushed or completed the writer.
 112137                    bool ok = _pipe.Reader.TryRead(out readResult);
 112138                    Debug.Assert(ok);
 112139                }
 2140                catch (OperationCanceledException exception)
 2141                {
 2142                    Debug.Assert(exception.CancellationToken == _cts.Token);
 2143                    cancellationToken.ThrowIfCancellationRequested();
 144
 1145                    if (_pipe.Reader.TryRead(out readResult) && readResult.IsCanceled)
 1146                    {
 147                        // Ok: return canceled readResult once after calling CancelPendingRead.
 148                        // Note that we can't return a canceled read result with a bogus buffer since the caller must
 149                        // be able to call reader.AdvanceTo with this buffer.
 1150                    }
 151                    else
 0152                    {
 0153                        throw new NotSupportedException(
 0154                            "Cannot resume reading an AsyncEnumerablePipeReader after canceling a ReadAsync or calling C
 155                    }
 1156                }
 113157            }
 158
 113159            return readResult;
 160
 161            Task<bool>? EncodeElements()
 110162            {
 110163                var encoder = new SliceEncoder(_pipe.Writer);
 164
 110165                Span<byte> sizePlaceholder = default;
 110166                if (_useSegments)
 72167                {
 72168                    sizePlaceholder = encoder.GetPlaceholderSpan(4);
 72169                }
 170
 110171                Task<bool>? result = null;
 172                bool keepEncoding;
 173
 174                do
 131337175                {
 131337176                    _encodeAction(ref encoder, _asyncEnumerator.Current);
 131337177                    ValueTask<bool> moveNext = _asyncEnumerator.MoveNextAsync();
 178
 131337179                    if (moveNext.IsCompletedSuccessfully)
 131307180                    {
 131307181                        bool hasNext = moveNext.Result;
 182
 183                        // If we reached the stream flush threshold, it's time to flush.
 131307184                        if (encoder.EncodedByteCount - sizePlaceholder.Length >= _streamFlushThreshold)
 63185                        {
 63186                            result = hasNext ? moveNext.AsTask() : null;
 63187                            keepEncoding = false;
 63188                        }
 189                        else
 131244190                        {
 131244191                            keepEncoding = hasNext;
 131244192                        }
 131307193                    }
 194                    else
 30195                    {
 196                        // If we can't get the next element synchronously, we return the move next task and end the loop
 197                        // to flush the encoded elements.
 30198                        result = moveNext.AsTask();
 30199                        keepEncoding = false;
 30200                    }
 131337201                }
 131337202                while (keepEncoding);
 203
 110204                if (_useSegments)
 72205                {
 72206                    SliceEncoder.EncodeVarUInt62(
 72207                        (ulong)(encoder.EncodedByteCount - sizePlaceholder.Length),
 72208                        sizePlaceholder);
 72209                }
 110210                return result;
 110211            }
 113212        }
 213
 0214        public override bool TryRead(out ReadResult result) => _pipe.Reader.TryRead(out result);
 215
 25216        internal AsyncEnumerablePipeReader(
 25217            IAsyncEnumerable<T> asyncEnumerable,
 25218            EncodeAction<T> encodeAction,
 25219            bool useSegments,
 25220            SliceEncodeOptions? encodeOptions)
 25221        {
 25222            encodeOptions ??= SliceEncodeOptions.Default;
 25223            _pipe = new Pipe(encodeOptions.PipeOptions);
 25224            _streamFlushThreshold = encodeOptions.StreamFlushThreshold;
 25225            _encodeAction = encodeAction;
 25226            _useSegments = useSegments;
 25227            _asyncEnumerator = asyncEnumerable.GetAsyncEnumerator(_cts.Token);
 25228        }
 229    }
 230}