| | 1 | | // Copyright (c) ZeroC, Inc. |
| | 2 | |
|
| | 3 | | using System.Diagnostics; |
| | 4 | | using System.IO.Pipelines; |
| | 5 | | using ZeroC.Slice; |
| | 6 | |
|
| | 7 | | namespace IceRpc.Internal; |
| | 8 | |
|
| | 9 | | /// <summary>Provides extension methods for <see cref="PipeReader" /> to decode payloads.</summary> |
| | 10 | | internal static class PipeReaderExtensions |
| | 11 | | { |
| | 12 | | /// <summary>Reads a Slice segment from a pipe reader.</summary> |
| | 13 | | /// <param name="reader">The pipe reader.</param> |
| | 14 | | /// <param name="encoding">The encoding.</param> |
| | 15 | | /// <param name="maxSize">The maximum size of this segment.</param> |
| | 16 | | /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param> |
| | 17 | | /// <returns>A read result with the segment read from the reader unless <see cref="ReadResult.IsCanceled" /> is |
| | 18 | | /// <see langword="true" />.</returns> |
| | 19 | | /// <exception cref="InvalidDataException">Thrown when the segment size could not be decoded or the segment size |
| | 20 | | /// exceeds <paramref name="maxSize" />.</exception> |
| | 21 | | /// <remarks>The caller must call AdvanceTo on the reader, as usual. With Slice1, this method reads all |
| | 22 | | /// the remaining bytes in the reader; otherwise, this method reads the segment size in the segment and returns |
| | 23 | | /// exactly segment size bytes. This method often examines the buffer it returns as part of ReadResult, |
| | 24 | | /// therefore the caller should never examine less than Buffer.End.</remarks> |
| | 25 | | internal static async ValueTask<ReadResult> ReadSegmentAsync( |
| | 26 | | this PipeReader reader, |
| | 27 | | SliceEncoding encoding, |
| | 28 | | int maxSize, |
| | 29 | | CancellationToken cancellationToken) |
| 4348 | 30 | | { |
| 4348 | 31 | | Debug.Assert(maxSize is > 0 and < int.MaxValue); |
| | 32 | |
|
| | 33 | | // This method does not attempt to read the reader synchronously. A caller that wants a sync attempt can |
| | 34 | | // call TryReadSegment. |
| | 35 | |
|
| 4348 | 36 | | if (encoding == SliceEncoding.Slice1) |
| 16 | 37 | | { |
| | 38 | | // We read everything up to the maxSize + 1. |
| | 39 | | // It's maxSize + 1 and not maxSize because if the segment's size is maxSize, we could get |
| | 40 | | // readResult.IsCompleted == false even though the full segment was read. |
| | 41 | |
|
| 16 | 42 | | ReadResult readResult = await reader.ReadAtLeastAsync(maxSize + 1, cancellationToken).ConfigureAwait(false); |
| | 43 | |
|
| 16 | 44 | | if (readResult.IsCompleted && readResult.Buffer.Length <= maxSize) |
| 16 | 45 | | { |
| 16 | 46 | | return readResult; |
| | 47 | | } |
| | 48 | | else |
| 0 | 49 | | { |
| 0 | 50 | | reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End); |
| 0 | 51 | | throw new InvalidDataException("The segment size exceeds the maximum value."); |
| | 52 | | } |
| | 53 | | } |
| | 54 | | else |
| 4332 | 55 | | { |
| | 56 | | ReadResult readResult; |
| | 57 | | int segmentSize; |
| | 58 | |
|
| 4333 | 59 | | while (true) |
| 4333 | 60 | | { |
| 4333 | 61 | | readResult = await reader.ReadAsync(cancellationToken).ConfigureAwait(false); |
| | 62 | |
|
| | 63 | | try |
| 4281 | 64 | | { |
| 4281 | 65 | | if (IsCompleteSegment(ref readResult, maxSize, out segmentSize, out long consumed)) |
| 4263 | 66 | | { |
| 4263 | 67 | | return readResult; |
| | 68 | | } |
| 2 | 69 | | else if (segmentSize > 0) |
| 1 | 70 | | { |
| 1 | 71 | | Debug.Assert(consumed > 0); |
| | 72 | |
|
| | 73 | | // We decoded the segmentSize and examined the whole buffer but it was not sufficient. |
| 1 | 74 | | reader.AdvanceTo(readResult.Buffer.GetPosition(consumed), readResult.Buffer.End); |
| 1 | 75 | | break; // while |
| | 76 | | } |
| | 77 | | else |
| 1 | 78 | | { |
| 1 | 79 | | Debug.Assert(!readResult.IsCompleted); // see IsCompleteSegment |
| 1 | 80 | | reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End); |
| | 81 | | // and continue loop with at least one additional byte |
| 1 | 82 | | } |
| 1 | 83 | | } |
| 16 | 84 | | catch |
| 16 | 85 | | { |
| | 86 | | // A ReadAsync or TryRead method that throws an exception should not leave the reader in a |
| | 87 | | // "reading" state. |
| 16 | 88 | | reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End); |
| 16 | 89 | | throw; |
| | 90 | | } |
| 1 | 91 | | } |
| | 92 | |
|
| 1 | 93 | | readResult = await reader.ReadAtLeastAsync(segmentSize, cancellationToken).ConfigureAwait(false); |
| | 94 | |
|
| 1 | 95 | | if (readResult.IsCanceled) |
| 0 | 96 | | { |
| 0 | 97 | | return readResult; |
| | 98 | | } |
| | 99 | |
|
| 1 | 100 | | if (readResult.Buffer.Length < segmentSize) |
| 0 | 101 | | { |
| 0 | 102 | | Debug.Assert(readResult.IsCompleted); |
| 0 | 103 | | reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End); |
| 0 | 104 | | throw new InvalidDataException( |
| 0 | 105 | | $"The payload has {readResult.Buffer.Length} bytes, but {segmentSize} bytes were expected."); |
| | 106 | | } |
| | 107 | |
|
| 1 | 108 | | return readResult.Buffer.Length == segmentSize ? readResult : |
| 1 | 109 | | new ReadResult(readResult.Buffer.Slice(0, segmentSize), isCanceled: false, isCompleted: false); |
| | 110 | | } |
| 4280 | 111 | | } |
| | 112 | |
|
| | 113 | | /// <summary>Attempts to read a Slice segment from a pipe reader.</summary> |
| | 114 | | /// <param name="reader">The pipe reader.</param> |
| | 115 | | /// <param name="encoding">The encoding.</param> |
| | 116 | | /// <param name="maxSize">The maximum size of this segment.</param> |
| | 117 | | /// <param name="readResult">The read result.</param> |
| | 118 | | /// <returns><see langword="true" /> when <paramref name="readResult" /> contains the segment read synchronously, or |
| | 119 | | /// the call was cancelled; otherwise, <see langword="false" />.</returns> |
| | 120 | | /// <exception cref="InvalidDataException">Thrown when the segment size could not be decoded or the segment size |
| | 121 | | /// exceeds the max segment size.</exception> |
| | 122 | | /// <remarks>When this method returns <see langword="true" />, the caller must call AdvanceTo on the reader, as |
| | 123 | | /// usual. This method often examines the buffer it returns as part of ReadResult, therefore the caller should never |
| | 124 | | /// examine less than Buffer.End when the return value is <see langword="true" />. When this method returns |
| | 125 | | /// <see langword="false" />, the caller must call <see cref="ReadSegmentAsync" />.</remarks> |
| | 126 | | internal static bool TryReadSegment( |
| | 127 | | this PipeReader reader, |
| | 128 | | SliceEncoding encoding, |
| | 129 | | int maxSize, |
| | 130 | | out ReadResult readResult) |
| 313 | 131 | | { |
| 313 | 132 | | Debug.Assert(maxSize is > 0 and < int.MaxValue); |
| | 133 | |
|
| 313 | 134 | | if (encoding == SliceEncoding.Slice1) |
| 137 | 135 | | { |
| 137 | 136 | | if (reader.TryRead(out readResult)) |
| 137 | 137 | | { |
| 137 | 138 | | if (readResult.IsCanceled) |
| 0 | 139 | | { |
| 0 | 140 | | return true; // and the buffer does not matter |
| | 141 | | } |
| | 142 | |
|
| 137 | 143 | | if (readResult.Buffer.Length > maxSize) |
| 0 | 144 | | { |
| 0 | 145 | | reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End); |
| 0 | 146 | | throw new InvalidDataException("The segment size exceeds the maximum value."); |
| | 147 | | } |
| | 148 | |
|
| 137 | 149 | | if (readResult.IsCompleted) |
| 137 | 150 | | { |
| 137 | 151 | | return true; |
| | 152 | | } |
| | 153 | | else |
| 0 | 154 | | { |
| | 155 | | // don't consume anything but mark the whole buffer as examined - we need more. |
| 0 | 156 | | reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End); |
| 0 | 157 | | } |
| 0 | 158 | | } |
| | 159 | |
|
| 0 | 160 | | readResult = default; |
| 0 | 161 | | return false; |
| | 162 | | } |
| | 163 | | else |
| 176 | 164 | | { |
| 176 | 165 | | if (reader.TryRead(out readResult)) |
| 172 | 166 | | { |
| | 167 | | try |
| 172 | 168 | | { |
| 172 | 169 | | if (IsCompleteSegment(ref readResult, maxSize, out int segmentSize, out long _)) |
| 169 | 170 | | { |
| 169 | 171 | | return true; |
| | 172 | | } |
| | 173 | | else |
| 1 | 174 | | { |
| | 175 | | // we don't consume anything but examined the whole buffer since it's not sufficient. |
| 1 | 176 | | reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End); |
| 1 | 177 | | readResult = default; |
| 1 | 178 | | return false; |
| | 179 | | } |
| | 180 | | } |
| 2 | 181 | | catch |
| 2 | 182 | | { |
| 2 | 183 | | reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End); |
| 2 | 184 | | throw; |
| | 185 | | } |
| | 186 | | } |
| | 187 | | else |
| 4 | 188 | | { |
| 4 | 189 | | return false; |
| | 190 | | } |
| | 191 | | } |
| 311 | 192 | | } |
| | 193 | |
|
| | 194 | | /// <summary>Checks if a read result holds a complete Slice segment and if the segment size does not exceed the |
| | 195 | | /// maximum size.</summary> |
| | 196 | | /// <returns><see langword="true" /> when <paramref name="readResult" /> holds a complete segment or is canceled; |
| | 197 | | /// otherwise, <see langword="false" />.</returns> |
| | 198 | | /// <remarks><paramref name="segmentSize" /> and <paramref name="consumed" /> can be set when this method returns |
| | 199 | | /// <see langword="false" />. In this case, both segmentSize and consumed are greater than 0.</remarks> |
| | 200 | | private static bool IsCompleteSegment( |
| | 201 | | ref ReadResult readResult, |
| | 202 | | int maxSize, |
| | 203 | | out int segmentSize, |
| | 204 | | out long consumed) |
| 4453 | 205 | | { |
| 4453 | 206 | | consumed = 0; |
| 4453 | 207 | | segmentSize = -1; |
| | 208 | |
|
| 4453 | 209 | | if (readResult.IsCanceled) |
| 0 | 210 | | { |
| 0 | 211 | | return true; // and buffer etc. does not matter |
| | 212 | | } |
| | 213 | |
|
| 4453 | 214 | | if (readResult.Buffer.IsEmpty) |
| 27 | 215 | | { |
| 27 | 216 | | Debug.Assert(readResult.IsCompleted); |
| 27 | 217 | | segmentSize = 0; |
| 27 | 218 | | return true; // the caller will call AdvanceTo on this buffer. |
| | 219 | | } |
| | 220 | |
|
| 4426 | 221 | | var decoder = new SliceDecoder(readResult.Buffer, SliceEncoding.Slice2); |
| 4426 | 222 | | if (decoder.TryDecodeVarUInt62(out ulong ulongSize)) |
| 4417 | 223 | | { |
| 4417 | 224 | | consumed = decoder.Consumed; |
| | 225 | |
|
| | 226 | | try |
| 4417 | 227 | | { |
| 4417 | 228 | | segmentSize = checked((int)ulongSize); |
| 4417 | 229 | | } |
| 0 | 230 | | catch (OverflowException exception) |
| 0 | 231 | | { |
| 0 | 232 | | throw new InvalidDataException("The segment size can't be larger than int.MaxValue.", exception); |
| | 233 | | } |
| | 234 | |
|
| 4417 | 235 | | if (segmentSize > maxSize) |
| 6 | 236 | | { |
| 6 | 237 | | throw new InvalidDataException("The segment size exceeds the maximum value."); |
| | 238 | | } |
| | 239 | |
|
| 4411 | 240 | | if (readResult.Buffer.Length >= consumed + segmentSize) |
| 4405 | 241 | | { |
| | 242 | | // When segmentSize is 0, we return a read result with an empty buffer. |
| 4405 | 243 | | readResult = new ReadResult( |
| 4405 | 244 | | readResult.Buffer.Slice(readResult.Buffer.GetPosition(consumed), segmentSize), |
| 4405 | 245 | | isCanceled: false, |
| 4405 | 246 | | isCompleted: readResult.IsCompleted && |
| 4405 | 247 | | readResult.Buffer.Length == consumed + segmentSize); |
| | 248 | |
|
| 4405 | 249 | | return true; |
| | 250 | | } |
| | 251 | |
|
| 6 | 252 | | if (readResult.IsCompleted && consumed + segmentSize > readResult.Buffer.Length) |
| 4 | 253 | | { |
| 4 | 254 | | throw new InvalidDataException( |
| 4 | 255 | | $"The payload has {readResult.Buffer.Length} bytes, but {segmentSize} bytes were expected."); |
| | 256 | | } |
| | 257 | |
|
| | 258 | | // segmentSize and consumed are set and can be used by the caller. |
| 2 | 259 | | return false; |
| | 260 | | } |
| 9 | 261 | | else if (readResult.IsCompleted) |
| 8 | 262 | | { |
| 8 | 263 | | throw new InvalidDataException("Received a Slice segment with fewer bytes than promised."); |
| | 264 | | } |
| | 265 | | else |
| 1 | 266 | | { |
| 1 | 267 | | segmentSize = -1; |
| 1 | 268 | | return false; |
| | 269 | | } |
| 4435 | 270 | | } |
| | 271 | | } |