| | 1 | | // Copyright (c) ZeroC, Inc. |
| | 2 | |
|
| | 3 | | using System.Diagnostics; |
| | 4 | | using System.IO.Pipelines; |
| | 5 | | using ZeroC.Slice; |
| | 6 | |
|
| | 7 | | namespace IceRpc.Slice; |
| | 8 | |
|
| | 9 | | /// <summary>Provides an extension method for <see cref="IAsyncEnumerable{T}" /> to encode elements into a <see |
| | 10 | | /// cref="PipeReader"/>.</summary> |
| | 11 | | public static class AsyncEnumerableExtensions |
| | 12 | | { |
| | 13 | | /// <summary>Encodes an async enumerable into a stream of bytes represented by a <see cref="PipeReader"/>.</summary> |
| | 14 | | /// <typeparam name="T">The async enumerable element type.</typeparam> |
| | 15 | | /// <param name="asyncEnumerable">The async enumerable to encode into a stream of bytes.</param> |
| | 16 | | /// <param name="encodeAction">The action used to encode one element.</param> |
| | 17 | | /// <param name="useSegments"><see langword="true" /> if an element can be encoded on a variable number of bytes; |
| | 18 | | /// otherwise, <see langword="false" />.</param> |
| | 19 | | /// <param name="encoding">The Slice encoding to use.</param> |
| | 20 | | /// <param name="encodeOptions">The Slice encode options.</param> |
| | 21 | | /// <returns>A pipe reader that represents the encoded stream of bytes.</returns> |
| | 22 | | /// <remarks>This extension method is used to encode streaming parameters and streaming return values with the |
| | 23 | | /// Slice2 encoding.</remarks> |
| | 24 | | public static PipeReader ToPipeReader<T>( |
| | 25 | | this IAsyncEnumerable<T> asyncEnumerable, |
| | 26 | | EncodeAction<T> encodeAction, |
| | 27 | | bool useSegments, |
| | 28 | | SliceEncoding encoding = SliceEncoding.Slice2, |
| | 29 | | SliceEncodeOptions? encodeOptions = null) => |
| 25 | 30 | | new AsyncEnumerablePipeReader<T>( |
| 25 | 31 | | asyncEnumerable, |
| 25 | 32 | | encodeAction, |
| 25 | 33 | | useSegments, |
| 25 | 34 | | encoding, |
| 25 | 35 | | encodeOptions); |
| | 36 | |
|
| | 37 | | // Overriding ReadAtLeastAsyncCore or CopyToAsync methods for this reader is not critical since this reader is |
| | 38 | | // mostly used by the IceRPC core to copy the encoded data for the enumerable to the network stream. This copy |
| | 39 | | // doesn't use these methods. |
| | 40 | | #pragma warning disable CA1001 // Types that own disposable fields should be disposable. |
| | 41 | | private class AsyncEnumerablePipeReader<T> : PipeReader |
| | 42 | | #pragma warning restore CA1001 |
| | 43 | | { |
| | 44 | | // Disposed in Complete. |
| | 45 | | private readonly IAsyncEnumerator<T> _asyncEnumerator; |
| | 46 | |
|
| | 47 | | // We don't dispose _cts because it's not necessary |
| | 48 | | // (see https://github.com/dotnet/runtime/issues/29970#issuecomment-717840778) and we can't easily dispose it |
| | 49 | | // when no one is using it since CancelPendingRead can be called by another thread after Complete is called. |
| 25 | 50 | | private readonly CancellationTokenSource _cts = new(); |
| | 51 | | private readonly EncodeAction<T> _encodeAction; |
| | 52 | | private readonly SliceEncoding _encoding; |
| | 53 | | private bool _isCompleted; |
| | 54 | | private readonly bool _useSegments; |
| | 55 | | private readonly int _streamFlushThreshold; |
| | 56 | | private Task<bool>? _moveNext; |
| | 57 | | private readonly Pipe _pipe; |
| | 58 | |
|
| 96 | 59 | | public override void AdvanceTo(SequencePosition consumed) => _pipe.Reader.AdvanceTo(consumed); |
| | 60 | |
|
| | 61 | | public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) => |
| 0 | 62 | | _pipe.Reader.AdvanceTo(consumed, examined); |
| | 63 | |
|
| | 64 | | public override void CancelPendingRead() |
| 1 | 65 | | { |
| 1 | 66 | | _pipe.Reader.CancelPendingRead(); |
| 1 | 67 | | _cts.Cancel(); |
| 1 | 68 | | } |
| | 69 | |
|
| | 70 | | public override void Complete(Exception? exception = null) |
| 26 | 71 | | { |
| 26 | 72 | | if (!_isCompleted) |
| 19 | 73 | | { |
| 19 | 74 | | _isCompleted = true; |
| | 75 | |
|
| | 76 | | // Cancel MoveNextAsync if it's still running. |
| 19 | 77 | | _cts.Cancel(); |
| | 78 | |
|
| 19 | 79 | | _pipe.Reader.Complete(); |
| 19 | 80 | | _pipe.Writer.Complete(); |
| | 81 | |
|
| 19 | 82 | | _ = DisposeEnumeratorAsync(); |
| 19 | 83 | | } |
| | 84 | |
|
| | 85 | | async Task DisposeEnumeratorAsync() |
| 19 | 86 | | { |
| | 87 | | // Make sure MoveNextAsync is completed before disposing the enumerator. Calling DisposeAsync on the |
| | 88 | | // enumerator while MoveNextAsync is still running is disallowed. |
| 19 | 89 | | if (_moveNext is not null) |
| 2 | 90 | | { |
| | 91 | | try |
| 2 | 92 | | { |
| 2 | 93 | | _ = await _moveNext.ConfigureAwait(false); |
| 0 | 94 | | } |
| 2 | 95 | | catch |
| 2 | 96 | | { |
| 2 | 97 | | } |
| 2 | 98 | | } |
| 19 | 99 | | await _asyncEnumerator.DisposeAsync().ConfigureAwait(false); |
| 19 | 100 | | } |
| 26 | 101 | | } |
| | 102 | |
|
| | 103 | | public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default) |
| 99 | 104 | | { |
| 99 | 105 | | if (!_pipe.Reader.TryRead(out ReadResult readResult)) |
| 99 | 106 | | { |
| | 107 | | // If no more buffered data to read, fill the pipe with new data. |
| | 108 | |
|
| | 109 | | // If ReadAsync is canceled, cancel the enumerator iteration to ensure MoveNextAsync below completes. |
| 99 | 110 | | using CancellationTokenRegistration registration = cancellationToken.UnsafeRegister( |
| 1 | 111 | | cts => ((CancellationTokenSource)cts!).Cancel(), |
| 99 | 112 | | _cts); |
| | 113 | |
|
| | 114 | | bool hasNext; |
| | 115 | | try |
| 99 | 116 | | { |
| 99 | 117 | | if (_moveNext is null) |
| 24 | 118 | | { |
| 24 | 119 | | hasNext = await _asyncEnumerator.MoveNextAsync().ConfigureAwait(false); |
| 22 | 120 | | } |
| | 121 | | else |
| 75 | 122 | | { |
| 75 | 123 | | hasNext = await _moveNext.ConfigureAwait(false); |
| 74 | 124 | | _moveNext = null; |
| 74 | 125 | | } |
| | 126 | |
|
| 96 | 127 | | if (hasNext && EncodeElements() is Task<bool> moveNext) |
| 76 | 128 | | { |
| | 129 | | // Flush does not block because the pipe is configured to not pause flush. |
| 76 | 130 | | ValueTask<FlushResult> valueTask = _pipe.Writer.FlushAsync(CancellationToken.None); |
| 76 | 131 | | Debug.Assert(valueTask.IsCompletedSuccessfully); |
| | 132 | |
|
| 76 | 133 | | _moveNext = moveNext; |
| | 134 | | // And the next ReadAsync will await _moveNext. |
| 76 | 135 | | } |
| | 136 | | else |
| 20 | 137 | | { |
| | 138 | | // No need to flush the writer, complete takes care of it. |
| 20 | 139 | | _pipe.Writer.Complete(); |
| 20 | 140 | | } |
| | 141 | |
|
| | 142 | | // There are bytes in the reader or it's completed since we've just flushed or completed the writer. |
| 96 | 143 | | bool ok = _pipe.Reader.TryRead(out readResult); |
| 96 | 144 | | Debug.Assert(ok); |
| 96 | 145 | | } |
| 2 | 146 | | catch (OperationCanceledException exception) |
| 2 | 147 | | { |
| 2 | 148 | | Debug.Assert(exception.CancellationToken == _cts.Token); |
| 2 | 149 | | cancellationToken.ThrowIfCancellationRequested(); |
| | 150 | |
|
| 1 | 151 | | if (_pipe.Reader.TryRead(out readResult) && readResult.IsCanceled) |
| 1 | 152 | | { |
| | 153 | | // Ok: return canceled readResult once after calling CancelPendingRead. |
| | 154 | | // Note that we can't return a canceled read result with a bogus buffer since the caller must |
| | 155 | | // be able to call reader.AdvanceTo with this buffer. |
| 1 | 156 | | } |
| | 157 | | else |
| 0 | 158 | | { |
| 0 | 159 | | throw new NotSupportedException( |
| 0 | 160 | | "Cannot resume reading an AsyncEnumerablePipeReader after canceling a ReadAsync or calling C |
| | 161 | | } |
| 1 | 162 | | } |
| 97 | 163 | | } |
| | 164 | |
|
| 97 | 165 | | return readResult; |
| | 166 | |
|
| | 167 | | Task<bool>? EncodeElements() |
| 94 | 168 | | { |
| 94 | 169 | | var encoder = new SliceEncoder(_pipe.Writer, _encoding); |
| | 170 | |
|
| 94 | 171 | | Span<byte> sizePlaceholder = default; |
| 94 | 172 | | if (_useSegments) |
| 59 | 173 | | { |
| 59 | 174 | | sizePlaceholder = encoder.GetPlaceholderSpan(4); |
| 59 | 175 | | } |
| | 176 | |
|
| 94 | 177 | | Task<bool>? result = null; |
| | 178 | | bool keepEncoding; |
| | 179 | |
|
| | 180 | | do |
| 131337 | 181 | | { |
| 131337 | 182 | | _encodeAction(ref encoder, _asyncEnumerator.Current); |
| 131337 | 183 | | ValueTask<bool> moveNext = _asyncEnumerator.MoveNextAsync(); |
| | 184 | |
|
| 131337 | 185 | | if (moveNext.IsCompletedSuccessfully) |
| 131323 | 186 | | { |
| 131323 | 187 | | bool hasNext = moveNext.Result; |
| | 188 | |
|
| | 189 | | // If we reached the stream flush threshold, it's time to flush. |
| 131323 | 190 | | if (encoder.EncodedByteCount - sizePlaceholder.Length >= _streamFlushThreshold) |
| 63 | 191 | | { |
| 63 | 192 | | result = hasNext ? moveNext.AsTask() : null; |
| 63 | 193 | | keepEncoding = false; |
| 63 | 194 | | } |
| | 195 | | else |
| 131260 | 196 | | { |
| 131260 | 197 | | keepEncoding = hasNext; |
| 131260 | 198 | | } |
| 131323 | 199 | | } |
| | 200 | | else |
| 14 | 201 | | { |
| | 202 | | // If we can't get the next element synchronously, we return the move next task and end the loop |
| | 203 | | // to flush the encoded elements. |
| 14 | 204 | | result = moveNext.AsTask(); |
| 14 | 205 | | keepEncoding = false; |
| 14 | 206 | | } |
| 131337 | 207 | | } |
| 131337 | 208 | | while (keepEncoding); |
| | 209 | |
|
| 94 | 210 | | if (_useSegments) |
| 59 | 211 | | { |
| 59 | 212 | | SliceEncoder.EncodeVarUInt62( |
| 59 | 213 | | (ulong)(encoder.EncodedByteCount - sizePlaceholder.Length), |
| 59 | 214 | | sizePlaceholder); |
| 59 | 215 | | } |
| 94 | 216 | | return result; |
| 94 | 217 | | } |
| 97 | 218 | | } |
| | 219 | |
|
| 0 | 220 | | public override bool TryRead(out ReadResult result) => _pipe.Reader.TryRead(out result); |
| | 221 | |
|
| 25 | 222 | | internal AsyncEnumerablePipeReader( |
| 25 | 223 | | IAsyncEnumerable<T> asyncEnumerable, |
| 25 | 224 | | EncodeAction<T> encodeAction, |
| 25 | 225 | | bool useSegments, |
| 25 | 226 | | SliceEncoding encoding, |
| 25 | 227 | | SliceEncodeOptions? encodeOptions) |
| 25 | 228 | | { |
| 25 | 229 | | if (encoding == SliceEncoding.Slice1) |
| 0 | 230 | | { |
| 0 | 231 | | throw new NotSupportedException("Streaming is not supported by the Slice1 encoding."); |
| | 232 | | } |
| | 233 | |
|
| 25 | 234 | | encodeOptions ??= SliceEncodeOptions.Default; |
| 25 | 235 | | _pipe = new Pipe(encodeOptions.PipeOptions); |
| 25 | 236 | | _streamFlushThreshold = encodeOptions.StreamFlushThreshold; |
| 25 | 237 | | _encodeAction = encodeAction; |
| 25 | 238 | | _encoding = encoding; |
| 25 | 239 | | _useSegments = useSegments; |
| 25 | 240 | | _asyncEnumerator = asyncEnumerable.GetAsyncEnumerator(_cts.Token); |
| 25 | 241 | | } |
| | 242 | | } |
| | 243 | | } |