| | 1 | | // Copyright (c) ZeroC, Inc. |
| | 2 | |
|
| | 3 | | using System.Buffers; |
| | 4 | | using System.Diagnostics; |
| | 5 | | using System.IO.Pipelines; |
| | 6 | |
|
| | 7 | | namespace IceRpc; |
| | 8 | |
|
| | 9 | | /// <summary>Represents a <see cref="PipeReader" /> decorator that doesn't consume the data from the decoratee to allow |
| | 10 | | /// reading again this data from the beginning after being reset.</summary> |
| | 11 | | /// <remarks><para>The decorator becomes non-resettable if the decoratee's buffered data exceeds the maximum buffer size |
| | 12 | | /// provided to <see cref="ResettablePipeReaderDecorator(PipeReader, int)" /> or if the reading from the decoratee fails |
| | 13 | | /// with an exception other than <see cref="OperationCanceledException"/>.</para> |
| | 14 | | /// <para>Calling <see cref="Complete" /> on the decorator doesn't complete the decoratee to allow reading again the |
| | 15 | | /// data after the decorator is reset. It's therefore important to make the decorator non-resettable by setting <see |
| | 16 | | /// cref="IsResettable" /> to <see langword="false" /> to complete the decoratee.</para></remarks> |
| | 17 | | // The default CopyToAsync implementation is suitable for this reader implementation. It calls ReadAsync/AdvanceTo to |
| | 18 | | // read the data. This ensures that the decorated pipe reader buffered data is not consumed. |
| | 19 | | public sealed class ResettablePipeReaderDecorator : PipeReader |
| | 20 | | { |
| | 21 | | /// <summary>Gets or sets a value indicating whether this decorator can be reset.</summary> |
| | 22 | | /// <value><see langword="true"/> if this decorator can be reset; otherwise, <see langword="false"/>. Defaults to |
| | 23 | | /// <see langword="true"/>.</value> |
| | 24 | | /// <remarks>This property can only be set to <see langword="false" />. If <see cref="IsResettable"/> is <see |
| | 25 | | /// langword="true" /> and <see cref="Complete" /> was called, the decoratee is completed.</remarks> |
| | 26 | | public bool IsResettable |
| | 27 | | { |
| 24 | 28 | | get => _isResettable; |
| | 29 | |
|
| | 30 | | set |
| 25 | 31 | | { |
| 25 | 32 | | if (value) |
| 0 | 33 | | { |
| 0 | 34 | | throw new ArgumentException( |
| 0 | 35 | | $"The {nameof(IsResettable)} property cannot be set to true.", |
| 0 | 36 | | nameof(value)); |
| | 37 | | } |
| | 38 | |
|
| 25 | 39 | | if (_isResettable) |
| 20 | 40 | | { |
| 20 | 41 | | AdvanceDecoratee(); |
| | 42 | |
|
| | 43 | | // If Complete was called on this resettable decorator without an intervening Reset, we call Complete |
| | 44 | | // on the decoratee. |
| | 45 | |
|
| 20 | 46 | | _isResettable = false; |
| 20 | 47 | | if (_isReaderCompleted) |
| 0 | 48 | | { |
| | 49 | | // We complete the decoratee with the saved exception (can be null). |
| 0 | 50 | | Complete(_readerCompleteException); |
| 0 | 51 | | } |
| 20 | 52 | | } |
| 25 | 53 | | } |
| | 54 | | } |
| | 55 | |
|
| | 56 | | // The latest consumed given by caller; reset by Reset. |
| | 57 | | private SequencePosition? _consumed; |
| | 58 | | private readonly PipeReader _decoratee; |
| | 59 | | // The latest examined given by caller. |
| | 60 | | private SequencePosition? _examined; |
| | 61 | |
|
| | 62 | | // The highest examined given to _decoratee; not affected by Reset. |
| | 63 | | private SequencePosition? _highestExamined; |
| | 64 | |
|
| | 65 | | // True when read returned a canceled read result. |
| | 66 | | private bool _isCanceled; |
| | 67 | | // True when the caller complete this reader; reset by Reset. |
| | 68 | | private bool _isReaderCompleted; |
| | 69 | | private bool _isReadingInProgress; |
| 44 | 70 | | private bool _isResettable = true; |
| | 71 | | private readonly int _maxBufferSize; |
| | 72 | | private Exception? _readerCompleteException; |
| | 73 | |
|
| | 74 | | // The latest sequence returned by _decoratee; not affected by Reset. |
| | 75 | | private ReadOnlySequence<byte> _sequence; |
| | 76 | |
|
| | 77 | | /// <summary>Constructs a resettable pipe reader decorator.</summary> |
| | 78 | | /// <param name="decoratee">The pipe reader being decorated.</param> |
| | 79 | | /// <param name="maxBufferSize">The maximum size of buffered data in bytes.</param> |
| 44 | 80 | | public ResettablePipeReaderDecorator(PipeReader decoratee, int maxBufferSize) |
| 44 | 81 | | { |
| 44 | 82 | | _decoratee = decoratee; |
| 44 | 83 | | _maxBufferSize = maxBufferSize; |
| 44 | 84 | | } |
| | 85 | |
|
| | 86 | | /// <summary>Moves forward the pipeline's read cursor to after the consumed data. No data is consumed while |
| | 87 | | /// <see cref="IsResettable"/> value is true.</summary> |
| | 88 | | /// <param name="consumed">Marks the extent of the data that has been successfully processed.</param> |
| | 89 | | /// <seealso cref="PipeReader.AdvanceTo(SequencePosition)"/> |
| 34 | 90 | | public override void AdvanceTo(SequencePosition consumed) => AdvanceTo(consumed, consumed); |
| | 91 | |
|
| | 92 | | /// <summary>Moves forward the pipeline's read cursor to after the consumed data. No data is consumed while |
| | 93 | | /// <see cref="IsResettable"/> value is true.</summary> |
| | 94 | | /// <param name="consumed">Marks the extent of the data that has been successfully processed.</param> |
| | 95 | | /// <param name="examined">Marks the extent of the data that has been read and examined.</param> |
| | 96 | | /// <seealso cref="PipeReader.AdvanceTo(SequencePosition, SequencePosition)"/> |
| | 97 | | public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) |
| 42 | 98 | | { |
| | 99 | | // If reading returns a canceled read result, _isReadInProgress is set to false since it's not required to call |
| | 100 | | // AdvanceTo. Calling AdvanceTo after getting a canceled read result is also valid so we don't check if reading |
| | 101 | | // is in progress in this case. |
| 42 | 102 | | if (!_isCanceled && !_isReadingInProgress) |
| 0 | 103 | | { |
| 0 | 104 | | throw new InvalidOperationException("Cannot call AdvanceTo before reading the PipeReader."); |
| | 105 | | } |
| | 106 | |
|
| 42 | 107 | | _isReadingInProgress = false; |
| | 108 | |
|
| 42 | 109 | | Debug.Assert(_examined is null); |
| | 110 | |
|
| 42 | 111 | | if (_isResettable) |
| 36 | 112 | | { |
| 36 | 113 | | ThrowIfCompleted(); |
| | 114 | |
|
| | 115 | | // Don't call _decoratee.AdvanceTo just yet. It will be called on the next ReadAsync/TryRead call. This |
| | 116 | | // way, if Reset is called next, it won't mark the data as examined and the following ReadAsync/TryRead |
| | 117 | | // call won't block. It will return the buffered data. |
| 36 | 118 | | _examined = examined; |
| 36 | 119 | | _consumed = consumed; |
| 36 | 120 | | } |
| | 121 | | else |
| 6 | 122 | | { |
| | 123 | | // The examined position given to _decoratee.AdvanceTo must be ever-increasing. |
| 6 | 124 | | if (_highestExamined is not null && |
| 6 | 125 | | _sequence.GetOffset(examined) < _sequence.GetOffset(_highestExamined.Value)) |
| 0 | 126 | | { |
| 0 | 127 | | examined = _highestExamined.Value; |
| 0 | 128 | | } |
| 6 | 129 | | _decoratee.AdvanceTo(consumed, examined); |
| 6 | 130 | | } |
| 42 | 131 | | } |
| | 132 | |
|
| | 133 | | /// <summary>Cancels the pending <see cref="ReadAsync(CancellationToken)"/> operation without causing it to throw |
| | 134 | | /// and without completing the <see cref="PipeReader"/>. If there is no pending operation, this cancels the next |
| | 135 | | /// operation.</summary> |
| | 136 | | /// <seealso cref="PipeReader.CancelPendingRead"/> |
| | 137 | | // This method can be called from another thread so we always forward it to the decoratee directly. |
| | 138 | | // ReadAsync/ReadAtLeastAsync/TryRead will return IsCanceled as appropriate. |
| 6 | 139 | | public override void CancelPendingRead() => _decoratee.CancelPendingRead(); |
| | 140 | |
|
| | 141 | | /// <summary>Signals to the producer that the consumer is done reading.</summary> |
| | 142 | | /// <param name="exception">Optional <see cref="Exception "/> indicating a failure that's causing the pipeline to |
| | 143 | | /// complete.</param> |
| | 144 | | /// <seealso cref="PipeReader.Complete(Exception?)"/> |
| | 145 | | /// <remarks>If <see cref="IsResettable"/> value is true, <see cref="Complete" /> is not called on the decoratee to |
| | 146 | | /// allow reading again the data after a call to <see cref="Reset" />. To complete the decoratee, <see |
| | 147 | | /// cref="IsResettable" /> must be set to <see langword="false" />.</remarks> |
| | 148 | | public override void Complete(Exception? exception = default) |
| 42 | 149 | | { |
| 42 | 150 | | if (_isResettable) |
| 20 | 151 | | { |
| 20 | 152 | | if (_isReadingInProgress) |
| 14 | 153 | | { |
| 14 | 154 | | AdvanceTo(_sequence.Start); |
| 14 | 155 | | } |
| | 156 | |
|
| 20 | 157 | | if (!_isReaderCompleted) |
| 20 | 158 | | { |
| | 159 | | // Only save the first call to Complete |
| 20 | 160 | | _isReaderCompleted = true; |
| 20 | 161 | | _readerCompleteException = exception; |
| 20 | 162 | | } |
| | 163 | | // we naturally don't complete the decoratee, otherwise this decorator would no longer be resettable |
| 20 | 164 | | } |
| | 165 | | else |
| 22 | 166 | | { |
| 22 | 167 | | _isReadingInProgress = false; |
| 22 | 168 | | _decoratee.Complete(exception); |
| 22 | 169 | | } |
| 42 | 170 | | } |
| | 171 | |
|
| | 172 | | /// <summary>Asynchronously reads a sequence of bytes from the current <see cref="PipeReader"/>.</summary> |
| | 173 | | /// <param name="cancellationToken">The token to monitor for cancellation requests.</param> |
| | 174 | | /// <returns>A <see cref="ValueTask{TResult}"/> representing the asynchronous read operation.</returns> |
| | 175 | | /// <seealso cref="PipeReader.ReadAsync(CancellationToken)"/> |
| | 176 | | public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default) |
| 40 | 177 | | { |
| 40 | 178 | | _isReadingInProgress = !_isReadingInProgress ? true : |
| 40 | 179 | | throw new InvalidOperationException("Reading is already in progress."); |
| | 180 | |
|
| 40 | 181 | | ThrowIfCompleted(); |
| | 182 | |
|
| 40 | 183 | | AdvanceDecoratee(); |
| | 184 | |
|
| | 185 | | ReadResult readResult; |
| | 186 | | try |
| 40 | 187 | | { |
| 40 | 188 | | readResult = await _decoratee.ReadAsync(cancellationToken).ConfigureAwait(false); |
| 38 | 189 | | if (readResult.IsCanceled) |
| 4 | 190 | | { |
| 4 | 191 | | _isCanceled = true; |
| 4 | 192 | | _isReadingInProgress = false; |
| 4 | 193 | | } |
| 38 | 194 | | } |
| 2 | 195 | | catch (OperationCanceledException) |
| 2 | 196 | | { |
| 2 | 197 | | _isReadingInProgress = false; |
| 2 | 198 | | throw; |
| | 199 | | } |
| 0 | 200 | | catch |
| 0 | 201 | | { |
| 0 | 202 | | _isResettable = false; |
| 0 | 203 | | throw; |
| | 204 | | } |
| 38 | 205 | | return ProcessReadResult(readResult); |
| 38 | 206 | | } |
| | 207 | |
|
| | 208 | | /// <summary>Resets this pipe reader.</summary> |
| | 209 | | /// <exception cref="InvalidOperationException">Thrown if <see cref="IsResettable" /> is <see langword="false" /> or |
| | 210 | | /// if reading is in progress.</exception> |
| | 211 | | public void Reset() |
| 26 | 212 | | { |
| 26 | 213 | | if (_isResettable) |
| 26 | 214 | | { |
| 26 | 215 | | if (_isReadingInProgress) |
| 0 | 216 | | { |
| 0 | 217 | | throw new InvalidOperationException( |
| 0 | 218 | | "The resettable pipe reader decorator cannot be reset while reading is in progress."); |
| | 219 | | } |
| | 220 | |
|
| 26 | 221 | | if (_examined is not null) |
| 12 | 222 | | { |
| | 223 | | // Don't commit the caller's examined data on the decoratee. This ensures that the next ReadAsync call |
| | 224 | | // returns synchronously with the decoratee's buffered data (instead of blocking). |
| 12 | 225 | | _decoratee.AdvanceTo(_sequence.Start, _highestExamined ?? _sequence.Start); |
| 12 | 226 | | _examined = null; |
| 12 | 227 | | } |
| | 228 | |
|
| 26 | 229 | | _consumed = null; |
| 26 | 230 | | _isReaderCompleted = false; |
| 26 | 231 | | _readerCompleteException = null; |
| 26 | 232 | | } |
| | 233 | | else |
| 0 | 234 | | { |
| 0 | 235 | | throw new InvalidOperationException("Cannot reset non-resettable pipe reader decorator."); |
| | 236 | | } |
| 26 | 237 | | } |
| | 238 | |
|
| | 239 | | /// <summary>Attempts to synchronously read data from the <see cref="PipeReader"/>.</summary> |
| | 240 | | /// <param name="result">When this method returns <see langword="true"/>, this value is set to a |
| | 241 | | /// <see cref="ReadResult"/> instance that represents the result of the read call; otherwise, this value is set to |
| | 242 | | /// <see langword="default"/>.</param> |
| | 243 | | /// <returns><see langword="true"/> if data was available, or if the call was canceled or the writer was completed; |
| | 244 | | /// otherwise, <see langword="false"/>.</returns> |
| | 245 | | /// <seealso cref="PipeReader.TryRead(out ReadResult)"/>. |
| | 246 | | public override bool TryRead(out ReadResult result) |
| 4 | 247 | | { |
| 4 | 248 | | _isReadingInProgress = !_isReadingInProgress ? true : |
| 4 | 249 | | throw new InvalidOperationException("Reading is already in progress."); |
| | 250 | |
|
| 4 | 251 | | ThrowIfCompleted(); |
| | 252 | |
|
| 4 | 253 | | AdvanceDecoratee(); |
| | 254 | |
|
| | 255 | | try |
| 4 | 256 | | { |
| 4 | 257 | | if (_decoratee.TryRead(out result)) |
| 4 | 258 | | { |
| 4 | 259 | | if (result.IsCanceled) |
| 0 | 260 | | { |
| 0 | 261 | | _isCanceled = true; |
| 0 | 262 | | _isReadingInProgress = false; |
| 0 | 263 | | } |
| 4 | 264 | | result = ProcessReadResult(result); |
| 4 | 265 | | return true; |
| | 266 | | } |
| | 267 | | else |
| 0 | 268 | | { |
| 0 | 269 | | _isReadingInProgress = false; |
| 0 | 270 | | return false; |
| | 271 | | } |
| | 272 | | } |
| 0 | 273 | | catch |
| 0 | 274 | | { |
| 0 | 275 | | _isResettable = false; |
| 0 | 276 | | throw; |
| | 277 | | } |
| 4 | 278 | | } |
| | 279 | |
|
| | 280 | | /// <summary>Asynchronously reads a sequence of bytes from the current PipeReader.</summary> |
| | 281 | | /// <param name="minimumSize">The minimum length that needs to be buffered in order for the call to return.</param> |
| | 282 | | /// <param name="cancellationToken">The token to monitor for cancellation requests.</param> |
| | 283 | | /// <returns>A <see cref="ValueTask{TResult}"/> representing the asynchronous read operation.</returns> |
| | 284 | | protected override async ValueTask<ReadResult> ReadAtLeastAsyncCore( |
| | 285 | | int minimumSize, |
| | 286 | | CancellationToken cancellationToken = default) |
| 4 | 287 | | { |
| 4 | 288 | | _isReadingInProgress = !_isReadingInProgress ? true : |
| 4 | 289 | | throw new InvalidOperationException("Reading is already in progress."); |
| | 290 | |
|
| 4 | 291 | | ThrowIfCompleted(); |
| | 292 | |
|
| 4 | 293 | | AdvanceDecoratee(); |
| | 294 | |
|
| 4 | 295 | | long size = (_consumed is null ? 0 : _sequence.GetOffset(_consumed.Value)) + minimumSize; |
| | 296 | | try |
| 4 | 297 | | { |
| 4 | 298 | | minimumSize = checked((int)size); |
| 4 | 299 | | } |
| 0 | 300 | | catch (OverflowException exception) |
| 0 | 301 | | { |
| | 302 | | // In theory this shouldn't happen if _maxBufferSize is set to a reasonable value. |
| 0 | 303 | | throw new ArgumentException( |
| 0 | 304 | | $"{minimumSize} is too large and would cause the buffered data to be larger than int.MaxValue", |
| 0 | 305 | | nameof(minimumSize), |
| 0 | 306 | | exception); |
| | 307 | | } |
| | 308 | |
|
| | 309 | | ReadResult readResult; |
| | 310 | | try |
| 4 | 311 | | { |
| 4 | 312 | | readResult = await _decoratee.ReadAtLeastAsync(minimumSize, cancellationToken).ConfigureAwait(false); |
| 4 | 313 | | if (readResult.IsCanceled) |
| 0 | 314 | | { |
| 0 | 315 | | _isCanceled = true; |
| 0 | 316 | | _isReadingInProgress = false; |
| 0 | 317 | | } |
| 4 | 318 | | } |
| 0 | 319 | | catch (OperationCanceledException) |
| 0 | 320 | | { |
| 0 | 321 | | _isReadingInProgress = false; |
| 0 | 322 | | throw; |
| | 323 | | } |
| 0 | 324 | | catch |
| 0 | 325 | | { |
| 0 | 326 | | _isReadingInProgress = false; |
| 0 | 327 | | _isResettable = false; |
| 0 | 328 | | throw; |
| | 329 | | } |
| 4 | 330 | | return ProcessReadResult(readResult); |
| 4 | 331 | | } |
| | 332 | |
|
| | 333 | | /// <summary>Advances the decoratee to the examined position saved by the AdvanceTo call on the decorator.</summary> |
| | 334 | | /// <remarks>Calling AdvanceTo on the decoratee commits the latest examined data and ensures the next read call will |
| | 335 | | /// read additional data if all the data was examined. If the decorator is reset, this ensures that the next read |
| | 336 | | /// call will immediately return the buffered data instead of blocking.</remarks> |
| | 337 | | private void AdvanceDecoratee() |
| 68 | 338 | | { |
| 68 | 339 | | _isCanceled = false; |
| | 340 | |
|
| 68 | 341 | | if (_isResettable && _examined is not null) |
| 8 | 342 | | { |
| | 343 | | // The examined position given to _decoratee.AdvanceTo must be ever-increasing. |
| 8 | 344 | | if (_highestExamined is null || |
| 8 | 345 | | _sequence.GetOffset(_examined.Value) > _sequence.GetOffset(_highestExamined.Value)) |
| 8 | 346 | | { |
| 8 | 347 | | _highestExamined = _examined; |
| 8 | 348 | | } |
| | 349 | |
|
| 8 | 350 | | _decoratee.AdvanceTo(_sequence.Start, _highestExamined.Value); |
| | 351 | |
|
| 8 | 352 | | _examined = null; |
| 8 | 353 | | } |
| 68 | 354 | | } |
| | 355 | |
|
| | 356 | | private ReadResult ProcessReadResult(ReadResult readResult) |
| 46 | 357 | | { |
| 46 | 358 | | _sequence = readResult.Buffer; |
| | 359 | |
|
| 46 | 360 | | if (_consumed is SequencePosition consumed) |
| 6 | 361 | | { |
| | 362 | | // Remove bytes marked as consumed |
| 6 | 363 | | readResult = new ReadResult( |
| 6 | 364 | | readResult.Buffer.Slice(consumed), |
| 6 | 365 | | readResult.IsCanceled, |
| 6 | 366 | | readResult.IsCompleted); |
| 6 | 367 | | } |
| | 368 | |
|
| | 369 | | // We don't retry when the buffered data exceeds the maximum buffer size. |
| 46 | 370 | | if (_isResettable && (_sequence.Length > _maxBufferSize)) |
| 2 | 371 | | { |
| 2 | 372 | | _isResettable = false; |
| 2 | 373 | | } |
| | 374 | |
|
| 46 | 375 | | return readResult; |
| 46 | 376 | | } |
| | 377 | |
|
| | 378 | | private void ThrowIfCompleted() |
| 84 | 379 | | { |
| 84 | 380 | | if (_isReaderCompleted) |
| 0 | 381 | | { |
| 0 | 382 | | _isResettable = false; |
| 0 | 383 | | throw new InvalidOperationException("The pipe reader is completed."); |
| | 384 | | } |
| 84 | 385 | | } |
| | 386 | | } |