< Summary

Information
Class: IceRpc.Protobuf.RpcMethods.Internal.AsyncStream<T>
Assembly: IceRpc.Protobuf
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Protobuf/RpcMethods/Internal/AsyncStream.cs
Tag: 1856_27024993493
Line coverage
100%
Covered lines: 55
Uncovered lines: 0
Coverable lines: 55
Total lines: 137
Line coverage: 100%
Branch coverage
100%
Covered branches: 8
Total branches: 8
Branch coverage: 100%
Method coverage
100%
Covered methods: 4
Fully covered methods: 4
Total methods: 4
Method coverage: 100%
Full method coverage: 100%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
Dispose()100%44100%
GetAsyncEnumerator(...)100%22100%
EnumerateAsync()100%22100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Protobuf/RpcMethods/Internal/AsyncStream.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using Google.Protobuf;
 4using System.Diagnostics;
 5using System.IO.Pipelines;
 6using System.Runtime.CompilerServices;
 7
 8namespace IceRpc.Protobuf.RpcMethods.Internal;
 9
 10/// <summary>The default <see cref="IAsyncStream{T}" /> implementation. It wraps a <see cref="PipeReader" /> and
 11/// decodes its bytes into Protobuf messages of type <typeparamref name="T"/>.</summary>
 12internal sealed class AsyncStream<T> : IAsyncStream<T> where T : class, IMessage<T>
 13{
 14    private readonly PipeReader _reader;
 15    private readonly MessageParser<T> _messageParser;
 16    private readonly int _maxMessageLength;
 17
 18    // Canceled by Dispose when iteration has started, to unblock any pending ReadAsync.
 23219    private readonly CancellationTokenSource _disposeCts = new();
 20
 21    // Set when GetAsyncEnumerator is called. This enforces the single-enumerator contract even if the created
 22    // enumerator is never advanced.
 23    private bool _enumeratorCreated;
 24
 25    // Atomic state used to safely arbitrate ownership of _reader.Complete() between Dispose and the first
 26    // MoveNextAsync.
 27    private int _state;
 28
 29    public void Dispose()
 22630    {
 22631        int original = Interlocked.Exchange(ref _state, (int)State.Disposed);
 32
 22633        switch ((State)original)
 34        {
 35            case State.Initial:
 36                // No iteration could have started (and any future MoveNextAsync will see Disposed and throw).
 37                // Safe to complete the reader directly from this thread.
 10138                _reader.Complete();
 10139                _disposeCts.Dispose();
 10140                break;
 41
 42            case State.Iterating:
 43                // The iterator owns the reader; its finally will complete it. We only signal cancellation here.
 44                // We must not dispose _disposeCts here: a linked CTS inside the iterator may still hold a
 45                // registration on _disposeCts.Token.
 12346                _disposeCts.Cancel();
 12347                break;
 48
 49            case State.Disposed:
 50                // no-op (Dispose called more than once).
 251                break;
 52        }
 22653    }
 54
 55    public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken)
 23156    {
 57        // We don't check for Disposed here: if the stream was disposed, the first MoveNextAsync call on the
 58        // returned enumerator throws ObjectDisposedException (see EnumerateAsync).
 23159        if (_enumeratorCreated)
 160        {
 161            throw new InvalidOperationException($"An {nameof(IAsyncStream<T>)} can only be enumerated once.");
 62        }
 23063        _enumeratorCreated = true;
 23064        return EnumerateAsync(cancellationToken).GetAsyncEnumerator(cancellationToken);
 23065    }
 66
 23267    internal AsyncStream(PipeReader reader, MessageParser<T> messageParser, int maxMessageLength)
 23268    {
 23269        _reader = reader;
 23270        _messageParser = messageParser;
 23271        _maxMessageLength = maxMessageLength;
 23272    }
 73
 74    private async IAsyncEnumerable<T> EnumerateAsync([EnumeratorCancellation] CancellationToken cancellationToken)
 22875    {
 76        // Because this async method returns an IAsyncEnumerable<T>, it only starts executing when the caller starts
 77        // iterating (calls MoveNextAsync on the enumerator). It does not execute when EnumerateAsync is called, or
 78        // even when GetAsyncEnumerator is called on the returned IAsyncEnumerable<T>.
 79
 80        // Atomically claim the reader (Idle -> Iterating). This races with Dispose's atomic transition to Disposed;
 81        // whichever transition wins from Idle owns _reader.Complete().
 22882        int original = Interlocked.CompareExchange(ref _state, (int)State.Iterating, (int)State.Initial);
 22883        ObjectDisposedException.ThrowIf(original == (int)State.Disposed, this);
 13184        Debug.Assert(original == (int)State.Initial); // _enumeratorCreated forbids a second iteration.
 85
 86        // Link the caller-provided token with our internal dispose token so that Dispose can unblock ReadAsync.
 13187        using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(
 13188            cancellationToken,
 13189            _disposeCts.Token);
 13190        CancellationToken linkedToken = linkedCts.Token;
 91
 92        try
 13193        {
 6591494            while (true)
 6591495            {
 96                T? message;
 97                try
 6591498                {
 6591499                    message = await _reader.ReadProtobufMessageAsync(
 65914100                        _messageParser,
 65914101                        _maxMessageLength,
 65914102                        linkedToken).ConfigureAwait(false);
 65805103                }
 108104                catch (OperationCanceledException) when (linkedToken.IsCancellationRequested)
 108105                {
 106                    // Re-issue the cancellation with the caller's token so the OCE that propagates carries the
 107                    // token the caller passed in (not our internal linkedToken). When dispose is the only source,
 108                    // surface dispose-mid-iteration as ObjectDisposedException.
 108109                    cancellationToken.ThrowIfCancellationRequested();
 110
 111                    // Safe to read _state without a barrier: Dispose writes State.Disposed before calling
 112                    // _disposeCts.Cancel(), and observing the cancellation here establishes happens-before
 113                    // with that write.
 105114                    Debug.Assert(_state == (int)State.Disposed);
 105115                    throw new ObjectDisposedException(nameof(AsyncStream<>), "The stream was disposed while reading.");
 116                }
 117
 65805118                if (message is null)
 20119                {
 20120                    yield break;
 121                }
 65785122                yield return message;
 65783123            }
 124        }
 125        finally
 131126        {
 131127            _reader.Complete();
 131128        }
 22129    }
 130
 131    private enum State
 132    {
 133        Initial = 0,
 134        Iterating = 1,
 135        Disposed = 2
 136    }
 137}