| | | 1 | | // Copyright (c) ZeroC, Inc. |
| | | 2 | | |
| | | 3 | | using System.Diagnostics; |
| | | 4 | | using System.IO.Pipelines; |
| | | 5 | | using ZeroC.Slice.Codec; |
| | | 6 | | |
| | | 7 | | namespace IceRpc.Internal; |
| | | 8 | | |
| | | 9 | | /// <summary>Provides extension methods for <see cref="PipeReader" /> to read Slice segments.</summary> |
| | | 10 | | internal static class PipeReaderExtensions |
| | | 11 | | { |
| | | 12 | | /// <summary>Reads a Slice segment from a pipe reader.</summary> |
| | | 13 | | /// <param name="reader">The pipe reader.</param> |
| | | 14 | | /// <param name="maxSize">The maximum size of this segment.</param> |
| | | 15 | | /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param> |
| | | 16 | | /// <returns>A read result with the segment read from the reader unless <see cref="ReadResult.IsCanceled" /> is |
| | | 17 | | /// <see langword="true" />.</returns> |
| | | 18 | | /// <exception cref="InvalidDataException">Thrown when the segment size could not be decoded or the segment size |
| | | 19 | | /// exceeds <paramref name="maxSize" />.</exception> |
| | | 20 | | /// <remarks>The caller must call AdvanceTo on the reader, as usual. This method reads the segment size in the |
| | | 21 | | /// segment and returns exactly segment size bytes. This method often examines the buffer it returns as part of |
| | | 22 | | /// ReadResult, therefore the caller should never examine less than Buffer.End.</remarks> |
| | | 23 | | internal static async ValueTask<ReadResult> ReadSliceSegmentAsync( |
| | | 24 | | this PipeReader reader, |
| | | 25 | | int maxSize, |
| | | 26 | | CancellationToken cancellationToken) |
| | 2322 | 27 | | { |
| | 2322 | 28 | | Debug.Assert(maxSize is > 0 and < int.MaxValue); |
| | | 29 | | |
| | | 30 | | // This method does not attempt to read the reader synchronously. A caller that wants a sync attempt can |
| | | 31 | | // call TryReadSliceSegment. |
| | | 32 | | |
| | | 33 | | ReadResult readResult; |
| | | 34 | | int segmentSize; |
| | | 35 | | |
| | 2323 | 36 | | while (true) |
| | 2323 | 37 | | { |
| | 2323 | 38 | | readResult = await reader.ReadAsync(cancellationToken).ConfigureAwait(false); |
| | | 39 | | |
| | | 40 | | try |
| | 2302 | 41 | | { |
| | 2302 | 42 | | if (IsCompleteSegment(ref readResult, maxSize, out segmentSize, out long consumed)) |
| | 2290 | 43 | | { |
| | 2290 | 44 | | return readResult; |
| | | 45 | | } |
| | 3 | 46 | | else if (segmentSize > 0) |
| | 2 | 47 | | { |
| | 2 | 48 | | Debug.Assert(consumed > 0); |
| | | 49 | | |
| | | 50 | | // We decoded the segmentSize and examined the whole buffer but it was not sufficient. |
| | 2 | 51 | | reader.AdvanceTo(readResult.Buffer.GetPosition(consumed), readResult.Buffer.End); |
| | 2 | 52 | | break; // while |
| | | 53 | | } |
| | | 54 | | else |
| | 1 | 55 | | { |
| | 1 | 56 | | Debug.Assert(!readResult.IsCompleted); // see IsCompleteSegment |
| | 1 | 57 | | reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End); |
| | | 58 | | // and continue loop with at least one additional byte |
| | 1 | 59 | | } |
| | 1 | 60 | | } |
| | 9 | 61 | | catch |
| | 9 | 62 | | { |
| | | 63 | | // A ReadAsync or TryRead method that throws an exception should not leave the reader in a |
| | | 64 | | // "reading" state. |
| | 9 | 65 | | reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End); |
| | 9 | 66 | | throw; |
| | | 67 | | } |
| | 1 | 68 | | } |
| | | 69 | | |
| | 2 | 70 | | readResult = await reader.ReadAtLeastAsync(segmentSize, cancellationToken).ConfigureAwait(false); |
| | | 71 | | |
| | 1 | 72 | | if (readResult.IsCanceled) |
| | 0 | 73 | | { |
| | 0 | 74 | | return readResult; |
| | | 75 | | } |
| | | 76 | | |
| | 1 | 77 | | if (readResult.Buffer.Length < segmentSize) |
| | 0 | 78 | | { |
| | 0 | 79 | | Debug.Assert(readResult.IsCompleted); |
| | 0 | 80 | | reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End); |
| | 0 | 81 | | throw new InvalidDataException( |
| | 0 | 82 | | $"The payload has {readResult.Buffer.Length} bytes, but {segmentSize} bytes were expected."); |
| | | 83 | | } |
| | | 84 | | |
| | 1 | 85 | | return readResult.Buffer.Length == segmentSize ? readResult : |
| | 1 | 86 | | new ReadResult(readResult.Buffer.Slice(0, segmentSize), isCanceled: false, isCompleted: false); |
| | 2291 | 87 | | } |
| | | 88 | | |
| | | 89 | | /// <summary>Attempts to read a Slice segment from a pipe reader.</summary> |
| | | 90 | | /// <param name="reader">The pipe reader.</param> |
| | | 91 | | /// <param name="maxSize">The maximum size of this segment.</param> |
| | | 92 | | /// <param name="readResult">The read result.</param> |
| | | 93 | | /// <returns><see langword="true" /> when <paramref name="readResult" /> contains the segment read synchronously, or |
| | | 94 | | /// the call was cancelled; otherwise, <see langword="false" />.</returns> |
| | | 95 | | /// <exception cref="InvalidDataException">Thrown when the segment size could not be decoded or the segment size |
| | | 96 | | /// exceeds the max segment size.</exception> |
| | | 97 | | /// <remarks>When this method returns <see langword="true" />, the caller must call AdvanceTo on the reader, as |
| | | 98 | | /// usual. This method often examines the buffer it returns as part of ReadResult, therefore the caller should never |
| | | 99 | | /// examine less than Buffer.End when the return value is <see langword="true" />. When this method returns |
| | | 100 | | /// <see langword="false" />, the caller must call <see cref="ReadSliceSegmentAsync" />.</remarks> |
| | | 101 | | internal static bool TryReadSliceSegment( |
| | | 102 | | this PipeReader reader, |
| | | 103 | | int maxSize, |
| | | 104 | | out ReadResult readResult) |
| | 172 | 105 | | { |
| | 172 | 106 | | Debug.Assert(maxSize is > 0 and < int.MaxValue); |
| | | 107 | | |
| | 172 | 108 | | if (reader.TryRead(out readResult)) |
| | 168 | 109 | | { |
| | | 110 | | try |
| | 168 | 111 | | { |
| | 168 | 112 | | if (IsCompleteSegment(ref readResult, maxSize, out int segmentSize, out long _)) |
| | 165 | 113 | | { |
| | 165 | 114 | | return true; |
| | | 115 | | } |
| | | 116 | | else |
| | 1 | 117 | | { |
| | | 118 | | // we don't consume anything but examined the whole buffer since it's not sufficient. |
| | 1 | 119 | | reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End); |
| | 1 | 120 | | readResult = default; |
| | 1 | 121 | | return false; |
| | | 122 | | } |
| | | 123 | | } |
| | 2 | 124 | | catch |
| | 2 | 125 | | { |
| | 2 | 126 | | reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End); |
| | 2 | 127 | | throw; |
| | | 128 | | } |
| | | 129 | | } |
| | | 130 | | else |
| | 4 | 131 | | { |
| | 4 | 132 | | return false; |
| | | 133 | | } |
| | 170 | 134 | | } |
| | | 135 | | |
| | | 136 | | /// <summary>Checks if a read result holds a complete Slice segment and if the segment size does not exceed the |
| | | 137 | | /// maximum size.</summary> |
| | | 138 | | /// <returns><see langword="true" /> when <paramref name="readResult" /> holds a complete segment or is canceled; |
| | | 139 | | /// otherwise, <see langword="false" />.</returns> |
| | | 140 | | /// <remarks><paramref name="segmentSize" /> and <paramref name="consumed" /> can be set when this method returns |
| | | 141 | | /// <see langword="false" />. In this case, both segmentSize and consumed are greater than 0.</remarks> |
| | | 142 | | private static bool IsCompleteSegment( |
| | | 143 | | ref ReadResult readResult, |
| | | 144 | | int maxSize, |
| | | 145 | | out int segmentSize, |
| | | 146 | | out long consumed) |
| | 2470 | 147 | | { |
| | 2470 | 148 | | consumed = 0; |
| | 2470 | 149 | | segmentSize = -1; |
| | | 150 | | |
| | 2470 | 151 | | if (readResult.IsCanceled) |
| | 0 | 152 | | { |
| | 0 | 153 | | return true; // and buffer etc. does not matter |
| | | 154 | | } |
| | | 155 | | |
| | 2470 | 156 | | if (readResult.Buffer.IsEmpty) |
| | 27 | 157 | | { |
| | 27 | 158 | | Debug.Assert(readResult.IsCompleted); |
| | 27 | 159 | | segmentSize = 0; |
| | 27 | 160 | | return true; // the caller will call AdvanceTo on this buffer. |
| | | 161 | | } |
| | | 162 | | |
| | 2443 | 163 | | var decoder = new SliceDecoder(readResult.Buffer); |
| | 2443 | 164 | | if (decoder.TryDecodeVarUInt62(out ulong ulongSize)) |
| | 2437 | 165 | | { |
| | 2437 | 166 | | consumed = decoder.Consumed; |
| | | 167 | | |
| | | 168 | | try |
| | 2437 | 169 | | { |
| | 2437 | 170 | | segmentSize = checked((int)ulongSize); |
| | 2437 | 171 | | } |
| | 0 | 172 | | catch (OverflowException exception) |
| | 0 | 173 | | { |
| | 0 | 174 | | throw new InvalidDataException("The segment size can't be larger than int.MaxValue.", exception); |
| | | 175 | | } |
| | | 176 | | |
| | 2437 | 177 | | if (segmentSize > maxSize) |
| | 3 | 178 | | { |
| | 3 | 179 | | throw new InvalidDataException("The segment size exceeds the maximum value."); |
| | | 180 | | } |
| | | 181 | | |
| | 2434 | 182 | | if (readResult.Buffer.Length >= consumed + segmentSize) |
| | 2428 | 183 | | { |
| | | 184 | | // When segmentSize is 0, we return a read result with an empty buffer. |
| | 2428 | 185 | | readResult = new ReadResult( |
| | 2428 | 186 | | readResult.Buffer.Slice(readResult.Buffer.GetPosition(consumed), segmentSize), |
| | 2428 | 187 | | isCanceled: false, |
| | 2428 | 188 | | isCompleted: readResult.IsCompleted && |
| | 2428 | 189 | | readResult.Buffer.Length == consumed + segmentSize); |
| | | 190 | | |
| | 2428 | 191 | | return true; |
| | | 192 | | } |
| | | 193 | | |
| | 6 | 194 | | if (readResult.IsCompleted && consumed + segmentSize > readResult.Buffer.Length) |
| | 3 | 195 | | { |
| | 3 | 196 | | throw new InvalidDataException( |
| | 3 | 197 | | $"The payload has {readResult.Buffer.Length} bytes, but {segmentSize} bytes were expected."); |
| | | 198 | | } |
| | | 199 | | |
| | | 200 | | // segmentSize and consumed are set and can be used by the caller. |
| | 3 | 201 | | return false; |
| | | 202 | | } |
| | 6 | 203 | | else if (readResult.IsCompleted) |
| | 5 | 204 | | { |
| | 5 | 205 | | throw new InvalidDataException("Received a Slice segment with fewer bytes than promised."); |
| | | 206 | | } |
| | | 207 | | else |
| | 1 | 208 | | { |
| | 1 | 209 | | segmentSize = -1; |
| | 1 | 210 | | return false; |
| | | 211 | | } |
| | 2459 | 212 | | } |
| | | 213 | | } |