< Summary

Information
Class: IceRpc.Internal.IceRpcProtocolConnection
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Internal/IceRpcProtocolConnection.cs
Tag: 592_20856082467
Line coverage
91%
Covered lines: 854
Uncovered lines: 83
Coverable lines: 937
Total lines: 1599
Line coverage: 91.1%
Branch coverage
89%
Covered branches: 200
Total branches: 224
Branch coverage: 89.2%
Method coverage
100%
Covered methods: 34
Total methods: 34
Method coverage: 100%

Metrics

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Internal/IceRpcProtocolConnection.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Transports;
 4using System.Buffers;
 5using System.Collections.Immutable;
 6using System.Diagnostics;
 7using System.IO.Pipelines;
 8using System.Security.Authentication;
 9using ZeroC.Slice;
 10
 11namespace IceRpc.Internal;
 12
 13internal sealed class IceRpcProtocolConnection : IProtocolConnection
 14{
 15    private const int MaxGoAwayFrameBodySize = 16;
 16    private const int MaxSettingsFrameBodySize = 1024;
 17
 148718    private bool IsServer => _transportConnectionInformation is not null;
 19
 20    private Task? _acceptRequestsTask;
 21
 22    private Task? _connectTask;
 23    private IConnectionContext? _connectionContext; // non-null once the connection is established
 24    private IMultiplexedStream? _controlStream;
 25
 26    // The number of outstanding dispatches and invocations.
 27    // DisposeAsync waits until this count reaches 0 (using _dispatchesAndInvocationsCompleted) before disposing the
 28    // underlying transport connection. So when this count is greater than 0, we know _transportConnection and other
 29    // fields are not disposed.
 30    // _dispatchInvocationCount is also used for the inactivity check: the connection remains active while
 31    // _dispatchInvocationCount > 0 or _streamInputOutputCount > 0.
 32    private int _dispatchInvocationCount;
 33
 34    private readonly SemaphoreSlim? _dispatchSemaphore;
 35
 36    private readonly IDispatcher? _dispatcher;
 36937    private readonly TaskCompletionSource _dispatchesAndInvocationsCompleted =
 36938        new(TaskCreationOptions.RunContinuationsAsynchronously);
 39
 40    private Task? _disposeTask;
 41
 42    // This cancellation token source is canceled when the connection is disposed.
 36943    private readonly CancellationTokenSource _disposedCts = new();
 44
 45    // Canceled when we receive the GoAway frame from the peer.
 36946    private readonly CancellationTokenSource _goAwayCts = new();
 47
 48    // The GoAway frame received from the peer. Read it only after _goAwayCts is canceled.
 49    private IceRpcGoAway _goAwayFrame;
 50
 51    // The number of bytes we need to encode a size up to _maxPeerHeaderSize. It's 2 for DefaultMaxIceRpcHeaderSize.
 36952    private int _headerSizeLength = 2;
 53
 54    private readonly TimeSpan _inactivityTimeout;
 55    private readonly Timer _inactivityTimeoutTimer;
 56    private string? _invocationRefusedMessage;
 57
 58    // The ID of the last bidirectional stream accepted by this connection. It's null as long as no bidirectional stream
 59    // was accepted.
 60    private ulong? _lastRemoteBidirectionalStreamId;
 61
 62    // The ID of the last unidirectional stream accepted by this connection. It's null as long as no unidirectional
 63    // stream (other than _remoteControlStream) was accepted.
 64    private ulong? _lastRemoteUnidirectionalStreamId;
 65
 66    private readonly int _maxLocalHeaderSize;
 36967    private int _maxPeerHeaderSize = ConnectionOptions.DefaultMaxIceRpcHeaderSize;
 68
 36969    private readonly Lock _mutex = new();
 70
 71    private Task? _readGoAwayTask;
 72
 73    // A connection refuses invocations when it's disposed, shut down, shutting down or merely "shutdown requested".
 74    private bool _refuseInvocations;
 75
 76    private IMultiplexedStream? _remoteControlStream;
 77
 78    private readonly CancellationTokenSource _shutdownOrGoAwayCts;
 79
 80    // The thread that completes this TCS can run the continuations, and as a result its result must be set without
 81    // holding a lock on _mutex.
 36982    private readonly TaskCompletionSource _shutdownRequestedTcs = new();
 83
 84    private Task? _shutdownTask;
 85
 86    // Keeps track of the number of stream Input and Output that are not completed yet.
 87    // It's not the same as the _dispatchInvocationCount: a dispatch or invocation can be completed while the
 88    // application is still reading an incoming frame payload that corresponds to a stream input.
 89    // ShutdownAsync waits for both _streamInputOutputCount and _dispatchInvocationCount to reach 0, while DisposeAsync
 90    // only waits for _dispatchInvocationCount to reach 0.
 91    private int _streamInputOutputCount;
 92
 93    // The streams are completed when _shutdownTask is not null and _streamInputOutputCount is 0.
 36994    private readonly TaskCompletionSource _streamsCompleted = new(TaskCreationOptions.RunContinuationsAsynchronously);
 95
 96    private readonly ITaskExceptionObserver? _taskExceptionObserver;
 97
 98    private readonly IMultiplexedConnection _transportConnection;
 99
 100    // Only set for server connections.
 101    private readonly TransportConnectionInformation? _transportConnectionInformation;
 102
 103    public Task<(TransportConnectionInformation ConnectionInformation, Task ShutdownRequested)> ConnectAsync(
 104        CancellationToken cancellationToken)
 362105    {
 106        Task<(TransportConnectionInformation ConnectionInformation, Task ShutdownRequested)> result;
 107
 108        lock (_mutex)
 362109        {
 362110            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 111
 360112            if (_connectTask is not null)
 0113            {
 0114                throw new InvalidOperationException("Cannot call connect more than once.");
 115            }
 116
 360117            result = PerformConnectAsync();
 360118            _connectTask = result;
 360119        }
 360120        return result;
 121
 122        async Task<(TransportConnectionInformation ConnectionInformation, Task ShutdownRequested)> PerformConnectAsync()
 360123        {
 124            // Make sure we execute the function without holding the connection mutex lock.
 360125            await Task.Yield();
 126
 127            // _disposedCts is not disposed at this point because DisposeAsync waits for the completion of _connectTask.
 360128            using var connectCts = CancellationTokenSource.CreateLinkedTokenSource(
 360129                cancellationToken,
 360130                _disposedCts.Token);
 131
 132            TransportConnectionInformation transportConnectionInformation;
 133
 134            try
 360135            {
 136                // If the transport connection information is null, we need to connect the transport connection. It's
 137                // null for client connections. The transport connection of a server connection is established by
 138                // Server.
 360139                transportConnectionInformation = _transportConnectionInformation ??
 360140                    await _transportConnection.ConnectAsync(connectCts.Token).ConfigureAwait(false);
 141
 335142                _controlStream = await _transportConnection.CreateStreamAsync(
 335143                    false,
 335144                    connectCts.Token).ConfigureAwait(false);
 145
 329146                var settings = new IceRpcSettings(
 329147                    _maxLocalHeaderSize == ConnectionOptions.DefaultMaxIceRpcHeaderSize ?
 329148                        ImmutableDictionary<IceRpcSettingKey, ulong>.Empty :
 329149                        new Dictionary<IceRpcSettingKey, ulong>
 329150                        {
 329151                            [IceRpcSettingKey.MaxHeaderSize] = (ulong)_maxLocalHeaderSize
 329152                        });
 153
 154                try
 329155                {
 329156                    await SendControlFrameAsync(
 329157                        IceRpcControlFrameType.Settings,
 329158                        settings.Encode,
 329159                        connectCts.Token).ConfigureAwait(false);
 324160                }
 5161                catch
 5162                {
 163                    // If we fail to send the Settings frame, we are in an abortive closure and we close Output to allow
 164                    // the peer to continue if it's waiting for us. This could happen when the cancellation token is
 165                    // canceled.
 5166                    _controlStream!.Output.CompleteOutput(success: false);
 5167                    throw;
 168                }
 169
 170                // Wait for the remote control stream to be accepted and read the protocol Settings frame
 324171                _remoteControlStream = await _transportConnection.AcceptStreamAsync(
 324172                    connectCts.Token).ConfigureAwait(false);
 173
 307174                await ReceiveControlFrameHeaderAsync(
 307175                    IceRpcControlFrameType.Settings,
 307176                    connectCts.Token).ConfigureAwait(false);
 177
 301178                await ReceiveSettingsFrameBody(connectCts.Token).ConfigureAwait(false);
 296179            }
 27180            catch (OperationCanceledException)
 27181            {
 27182                cancellationToken.ThrowIfCancellationRequested();
 183
 7184                Debug.Assert(_disposedCts.Token.IsCancellationRequested);
 7185                throw new IceRpcException(
 7186                    IceRpcError.OperationAborted,
 7187                    "The connection establishment was aborted because the connection was disposed.");
 188            }
 7189            catch (InvalidDataException exception)
 7190            {
 7191                throw new IceRpcException(
 7192                    IceRpcError.ConnectionAborted,
 7193                    "The connection establishment was aborted by an icerpc protocol error.",
 7194                    exception);
 195            }
 1196            catch (AuthenticationException)
 1197            {
 1198                throw;
 199            }
 29200            catch (IceRpcException)
 29201            {
 29202                throw;
 203            }
 0204            catch (Exception exception)
 0205            {
 0206                Debug.Fail($"ConnectAsync failed with an unexpected exception: {exception}");
 0207                throw;
 208            }
 209
 210            // This needs to be set before starting the accept requests task below.
 296211            _connectionContext = new ConnectionContext(this, transportConnectionInformation);
 212
 213            // We assign _readGoAwayTask and _acceptRequestsTask with _mutex locked to make sure this assignment
 214            // occurs before the start of DisposeAsync. Once _disposeTask is not null, _readGoAwayTask etc are
 215            // immutable.
 216            lock (_mutex)
 296217            {
 296218                if (_disposeTask is not null)
 0219                {
 0220                    throw new IceRpcException(
 0221                        IceRpcError.OperationAborted,
 0222                        "The connection establishment was aborted because the connection was disposed.");
 223                }
 224
 225                // Read the go away frame from the control stream.
 296226                _readGoAwayTask = ReadGoAwayAsync(_disposedCts.Token);
 227
 228                // Start a task that accepts requests (the "accept requests loop")
 296229                _acceptRequestsTask = AcceptRequestsAsync(_shutdownOrGoAwayCts.Token);
 296230            }
 231
 232            // The _acceptRequestsTask waits for this PerformConnectAsync completion before reading anything. As soon as
 233            // it receives a request, it will cancel this inactivity check.
 296234            ScheduleInactivityCheck();
 235
 296236            return (transportConnectionInformation, _shutdownRequestedTcs.Task);
 296237        }
 360238    }
 239
 240    public ValueTask DisposeAsync()
 395241    {
 242        lock (_mutex)
 395243        {
 395244            if (_disposeTask is null)
 369245            {
 369246                RefuseNewInvocations("The connection was disposed.");
 247
 369248                if (_streamInputOutputCount == 0)
 355249                {
 250                    // That's only for consistency. _streamsCompleted.Task matters only to ShutdownAsync.
 355251                    _streamsCompleted.TrySetResult();
 355252                }
 369253                if (_dispatchInvocationCount == 0)
 359254                {
 359255                    _dispatchesAndInvocationsCompleted.TrySetResult();
 359256                }
 257
 369258                _shutdownTask ??= Task.CompletedTask;
 369259                _disposeTask = PerformDisposeAsync();
 369260            }
 395261        }
 395262        return new(_disposeTask);
 263
 264        async Task PerformDisposeAsync()
 369265        {
 266            // Make sure we execute the code below without holding the mutex lock.
 369267            await Task.Yield();
 268
 369269            _disposedCts.Cancel();
 270
 271            // We don't lock _mutex since once _disposeTask is not null, _connectTask etc are immutable.
 272
 369273            if (_connectTask is not null)
 360274            {
 275                // We wait for _dispatchesAndInvocationsCompleted (since dispatches and invocations are somewhat under
 276                // our control), but not for _streamsCompleted, since we can't make the application complete the
 277                // incoming payload pipe readers.
 278                try
 360279                {
 360280                    await Task.WhenAll(
 360281                        _connectTask,
 360282                        _acceptRequestsTask ?? Task.CompletedTask,
 360283                        _readGoAwayTask ?? Task.CompletedTask,
 360284                        _shutdownTask,
 360285                        _dispatchesAndInvocationsCompleted.Task).ConfigureAwait(false);
 70286                }
 290287                catch
 290288                {
 289                    // Expected if any of these tasks failed or was canceled. Each task takes care of handling
 290                    // unexpected exceptions so there's no need to handle them here.
 290291                }
 360292            }
 293
 294            // If the application is still reading some incoming payload, the disposal of the transport connection can
 295            // abort this reading.
 369296            await _transportConnection.DisposeAsync().ConfigureAwait(false);
 297
 298            // It's safe to complete the output since write operations have been completed by the transport connection
 299            // disposal.
 369300            _controlStream?.Output.Complete();
 301
 302            // It's safe to complete the input since read operations have been completed by the transport connection
 303            // disposal.
 369304            _remoteControlStream?.Input.Complete();
 305
 369306            _dispatchSemaphore?.Dispose();
 369307            _disposedCts.Dispose();
 369308            _goAwayCts.Dispose();
 369309            _shutdownOrGoAwayCts.Dispose();
 310
 369311            await _inactivityTimeoutTimer.DisposeAsync().ConfigureAwait(false);
 369312        }
 395313    }
 314
 315    public Task<IncomingResponse> InvokeAsync(OutgoingRequest request, CancellationToken cancellationToken = default)
 1437316    {
 1437317        if (request.Protocol != Protocol.IceRpc)
 1318        {
 1319            throw new InvalidOperationException(
 1320                $"Cannot send {request.Protocol} request on {Protocol.IceRpc} connection.");
 321        }
 322
 323        lock (_mutex)
 1436324        {
 1436325            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 326
 1435327            if (_refuseInvocations)
 1328            {
 1329                throw new IceRpcException(IceRpcError.InvocationRefused, _invocationRefusedMessage);
 330            }
 1434331            if (_connectTask is null)
 0332            {
 0333                throw new InvalidOperationException("Cannot invoke on a connection before connecting it.");
 334            }
 1434335            if (!IsServer && !_connectTask.IsCompletedSuccessfully)
 0336            {
 0337                throw new InvalidOperationException(
 0338                    "Cannot invoke on a client connection that is not fully established.");
 339            }
 340            // It's possible but rare to invoke on a server connection that is still connecting.
 341
 1434342            if (request.ServiceAddress.Fragment.Length > 0)
 0343            {
 0344                throw new NotSupportedException("The icerpc protocol does not support fragments.");
 345            }
 346
 1434347            IncrementDispatchInvocationCount();
 1434348        }
 349
 1434350        return PerformInvokeAsync();
 351
 352        async Task<IncomingResponse> PerformInvokeAsync()
 1434353        {
 354            // Since _dispatchInvocationCount > 0, _disposedCts is not disposed.
 1434355            using var invocationCts = CancellationTokenSource.CreateLinkedTokenSource(
 1434356                cancellationToken,
 1434357                _disposedCts.Token);
 358
 1434359            PipeReader? streamInput = null;
 360
 361            // This try/catch block cleans up streamInput (when not null) and decrements the dispatch-invocation count.
 362            try
 1434363            {
 364                // Create the stream.
 365                IMultiplexedStream stream;
 366                try
 1434367                {
 368                    // We want to cancel CreateStreamAsync as soon as the connection is being shutdown or received a
 369                    // GoAway frame.
 1434370                    using CancellationTokenRegistration _ = _shutdownOrGoAwayCts.Token.UnsafeRegister(
 5371                        cts => ((CancellationTokenSource)cts!).Cancel(),
 1434372                        invocationCts);
 373
 1434374                    stream = await _transportConnection.CreateStreamAsync(
 1434375                        bidirectional: !request.IsOneway,
 1434376                        invocationCts.Token).ConfigureAwait(false);
 377
 1425378                    streamInput = stream.IsBidirectional ? stream.Input : null;
 1425379                }
 8380                catch (OperationCanceledException)
 8381                {
 8382                    cancellationToken.ThrowIfCancellationRequested();
 383
 384                    // Connection was shutdown or disposed and we did not read the payload at all.
 6385                    throw new IceRpcException(IceRpcError.InvocationRefused, _invocationRefusedMessage);
 386                }
 1387                catch (IceRpcException exception)
 1388                {
 1389                    RefuseNewInvocations("The connection was lost.");
 1390                    throw new IceRpcException(IceRpcError.InvocationRefused, _invocationRefusedMessage, exception);
 391                }
 0392                catch (Exception exception)
 0393                {
 0394                    Debug.Fail($"CreateStreamAsync failed with an unexpected exception: {exception}");
 0395                    RefuseNewInvocations("The connection was lost.");
 0396                    throw new IceRpcException(IceRpcError.InvocationRefused, _invocationRefusedMessage, exception);
 397                }
 398
 1425399                using CancellationTokenRegistration tokenRegistration = _goAwayCts.Token.UnsafeRegister(
 1425400                    OnGoAway,
 1425401                    invocationCts);
 402
 403                PipeWriter payloadWriter;
 404
 405                try
 1425406                {
 407                    lock (_mutex)
 1425408                    {
 1425409                        if (_refuseInvocations)
 0410                        {
 411                            // Both stream.Output and stream.Output are completed by catch blocks below.
 0412                            throw new IceRpcException(IceRpcError.InvocationRefused, _invocationRefusedMessage);
 413                        }
 414
 1425415                        IncrementStreamInputOutputCount(stream.IsBidirectional);
 416
 417                        // Decorate the stream to decrement the input/output count on Complete.
 1425418                        stream = new MultiplexedStreamDecorator(stream, DecrementStreamInputOutputCount);
 1425419                        streamInput = stream.IsBidirectional ? stream.Input : null;
 1425420                    }
 421
 1425422                    EncodeHeader(stream.Output);
 1424423                    payloadWriter = request.GetPayloadWriter(stream.Output);
 1424424                }
 1425                catch
 1426                {
 1427                    stream.Output.CompleteOutput(success: false);
 1428                    throw;
 429                }
 430
 431                // From now on, we only use payloadWriter to write and we make sure to complete it.
 432
 1424433                bool hasContinuation = request.PayloadContinuation is not null;
 434                FlushResult flushResult;
 435
 436                try
 1424437                {
 1424438                    flushResult = await payloadWriter.CopyFromAsync(
 1424439                        request.Payload,
 1424440                        stream.WritesClosed,
 1424441                        endStream: !hasContinuation,
 1424442                        invocationCts.Token).ConfigureAwait(false);
 1416443                }
 8444                catch
 8445                {
 8446                    payloadWriter.CompleteOutput(success: false);
 8447                    request.PayloadContinuation?.Complete();
 8448                    throw;
 449                }
 450                finally
 1424451                {
 1424452                    request.Payload.Complete();
 1424453                }
 454
 1416455                if (flushResult.IsCompleted || flushResult.IsCanceled || !hasContinuation)
 1404456                {
 457                    // The remote reader doesn't want more data, or the copying was canceled, or there is no
 458                    // continuation: we're done.
 1404459                    payloadWriter.CompleteOutput(!flushResult.IsCanceled);
 1404460                    request.PayloadContinuation?.Complete();
 1404461                }
 462                else
 12463                {
 464                    // Sends the payload continuation in a background thread.
 12465                    SendRequestPayloadContinuation(
 12466                        request,
 12467                        payloadWriter,
 12468                        stream.WritesClosed,
 12469                        OnGoAway,
 12470                        invocationCts.Token);
 12471                }
 472
 1416473                if (request.IsOneway)
 1014474                {
 1014475                    return new IncomingResponse(request, _connectionContext!);
 476                }
 477
 402478                Debug.Assert(streamInput is not null);
 479
 480                try
 402481                {
 402482                    ReadResult readResult = await streamInput.ReadSegmentAsync(
 402483                        SliceEncoding.Slice2,
 402484                        _maxLocalHeaderSize,
 402485                        invocationCts.Token).ConfigureAwait(false);
 486
 487                    // Nothing cancels the stream input pipe reader.
 379488                    Debug.Assert(!readResult.IsCanceled);
 489
 379490                    if (readResult.Buffer.IsEmpty)
 0491                    {
 0492                        throw new IceRpcException(
 0493                            IceRpcError.IceRpcError,
 0494                            "Received an icerpc response with an empty header.");
 495                    }
 496
 379497                    (StatusCode statusCode, string? errorMessage, IDictionary<ResponseFieldKey, ReadOnlySequence<byte>> 
 379498                        DecodeHeader(readResult.Buffer);
 379499                    stream.Input.AdvanceTo(readResult.Buffer.End);
 500
 379501                    if (statusCode == StatusCode.TruncatedPayload && invocationCts.Token.IsCancellationRequested)
 0502                    {
 503                        // Canceling the sending of the payload continuation triggers the completion of the stream
 504                        // output. This may lead to a TruncatedPayload if the dispatch is currently reading the payload
 505                        // continuation. In such cases, we prioritize throwing an OperationCanceledException.
 0506                        fieldsPipeReader?.Complete();
 0507                        invocationCts.Token.ThrowIfCancellationRequested();
 0508                    }
 509
 379510                    var response = new IncomingResponse(
 379511                        request,
 379512                        _connectionContext!,
 379513                        statusCode,
 379514                        errorMessage,
 379515                        fields,
 379516                        fieldsPipeReader)
 379517                    {
 379518                        Payload = streamInput
 379519                    };
 520
 379521                    streamInput = null; // response now owns the stream input
 379522                    return response;
 523                }
 1524                catch (InvalidDataException exception)
 1525                {
 1526                    throw new IceRpcException(
 1527                        IceRpcError.IceRpcError,
 1528                        "Received an icerpc response with an invalid header.",
 1529                        exception);
 530                }
 531
 532                void OnGoAway(object? cts)
 12533                {
 12534                    if (!stream.IsStarted ||
 12535                        stream.Id >=
 12536                            (stream.IsBidirectional ?
 12537                                _goAwayFrame.BidirectionalStreamId :
 12538                                _goAwayFrame.UnidirectionalStreamId))
 4539                    {
 540                        // The request wasn't received by the peer so it's safe to cancel the invocation.
 4541                        ((CancellationTokenSource)cts!).Cancel();
 4542                    }
 12543                }
 544            }
 17545            catch (OperationCanceledException exception) when (exception.CancellationToken == invocationCts.Token)
 14546            {
 14547                cancellationToken.ThrowIfCancellationRequested();
 548
 7549                if (_disposedCts.IsCancellationRequested)
 4550                {
 551                    // DisposeAsync aborted the request.
 4552                    throw new IceRpcException(IceRpcError.OperationAborted);
 553                }
 554                else
 3555                {
 3556                    Debug.Assert(_goAwayCts.IsCancellationRequested);
 3557                    throw new IceRpcException(IceRpcError.InvocationCanceled, "The connection is shutting down.");
 558                }
 559            }
 560            finally
 1434561            {
 1434562                streamInput?.Complete();
 1434563                DecrementDispatchInvocationCount();
 1434564            }
 565
 566            static (StatusCode StatusCode, string? ErrorMessage, IDictionary<ResponseFieldKey, ReadOnlySequence<byte>>, 
 567                ReadOnlySequence<byte> buffer)
 379568            {
 379569                var decoder = new SliceDecoder(buffer, SliceEncoding.Slice2);
 570
 379571                StatusCode statusCode = decoder.DecodeStatusCode();
 379572                string? errorMessage = statusCode == StatusCode.Ok ? null : decoder.DecodeString();
 573
 379574                (IDictionary<ResponseFieldKey, ReadOnlySequence<byte>> fields, PipeReader? pipeReader) =
 379575                    DecodeFieldDictionary(
 379576                        ref decoder,
 382577                        (ref SliceDecoder decoder) => decoder.DecodeResponseFieldKey());
 578
 379579                return (statusCode, errorMessage, fields, pipeReader);
 379580            }
 581
 582            void EncodeHeader(PipeWriter streamOutput)
 1425583            {
 1425584                var encoder = new SliceEncoder(streamOutput, SliceEncoding.Slice2);
 585
 586                // Write the IceRpc request header.
 1425587                Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(_headerSizeLength);
 588
 589                // We use UnflushedBytes because EncodeFieldDictionary can write directly to streamOutput.
 1425590                long headerStartPos = streamOutput.UnflushedBytes;
 591
 1425592                var header = new IceRpcRequestHeader(request.ServiceAddress.Path, request.Operation);
 593
 1425594                header.Encode(ref encoder);
 595
 1425596                EncodeFieldDictionary(
 1425597                    request.Fields,
 14598                    (ref SliceEncoder encoder, RequestFieldKey key) => encoder.EncodeRequestFieldKey(key),
 1425599                    ref encoder,
 1425600                    streamOutput);
 601
 602                // We're done with the header encoding, write the header size.
 1425603                int headerSize = (int)(streamOutput.UnflushedBytes - headerStartPos);
 1425604                CheckPeerHeaderSize(headerSize);
 1424605                SliceEncoder.EncodeVarUInt62((uint)headerSize, sizePlaceholder);
 1424606            }
 1393607        }
 1434608    }
 609
 610    public Task ShutdownAsync(CancellationToken cancellationToken = default)
 98611    {
 612        lock (_mutex)
 98613        {
 98614            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 615
 95616            if (_shutdownTask is not null)
 2617            {
 2618                throw new InvalidOperationException("Cannot call ShutdownAsync more than once.");
 619            }
 93620            if (_connectTask is null || !_connectTask.IsCompletedSuccessfully)
 3621            {
 3622                throw new InvalidOperationException("Cannot shut down a protocol connection before it's connected.");
 623            }
 624
 90625            RefuseNewInvocations("The connection was shut down.");
 626
 90627            if (_streamInputOutputCount == 0)
 67628            {
 67629                _streamsCompleted.TrySetResult();
 67630            }
 90631            if (_dispatchInvocationCount == 0)
 66632            {
 66633                _dispatchesAndInvocationsCompleted.TrySetResult();
 66634            }
 635
 90636            _shutdownTask = PerformShutdownAsync();
 90637        }
 90638        return _shutdownTask;
 639
 640        async Task PerformShutdownAsync()
 90641        {
 90642            await Task.Yield(); // exit mutex lock
 643
 90644            _shutdownOrGoAwayCts.Cancel();
 645
 646            try
 90647            {
 90648                Debug.Assert(_acceptRequestsTask is not null);
 90649                Debug.Assert(_controlStream is not null);
 90650                Debug.Assert(_readGoAwayTask is not null);
 90651                Debug.Assert(_remoteControlStream is not null);
 652
 90653                await _acceptRequestsTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 654
 79655                using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _disposedCts.Token);
 656
 657                // Once shutdownTask is not null, _lastRemoteBidirectionalStreamId and _lastRemoteUnidirectionalStreamId
 658                // are immutable.
 659
 660                // When this peer is the server endpoint, the first accepted bidirectional stream ID is 0. When this
 661                // peer is the client endpoint, the first accepted bidirectional stream ID is 1.
 79662                IceRpcGoAway goAwayFrame = new(
 79663                    _lastRemoteBidirectionalStreamId is ulong value ? value + 4 : (IsServer ? 0ul : 1ul),
 79664                    (_lastRemoteUnidirectionalStreamId ?? _remoteControlStream.Id) + 4);
 665
 666                try
 79667                {
 79668                    _ = await SendControlFrameAsync(
 79669                        IceRpcControlFrameType.GoAway,
 79670                        goAwayFrame.Encode,
 79671                        cts.Token).ConfigureAwait(false);
 672
 673                    // Wait for the peer to send back a GoAway frame. The task should already be completed if the
 674                    // shutdown was initiated by the peer.
 77675                    await _readGoAwayTask.WaitAsync(cts.Token).ConfigureAwait(false);
 676
 677                    // Wait for all streams (other than the control streams) to have their Input and Output completed.
 67678                    await _streamsCompleted.Task.WaitAsync(cts.Token).ConfigureAwait(false);
 679
 680                    // Close the control stream to notify the peer that on our side, all the streams completed and that
 681                    // it can close the transport connection whenever it likes.
 66682                    _controlStream.Output.CompleteOutput(success: true);
 66683                }
 13684                catch
 13685                {
 686                    // If we fail to send the GoAway frame or some other failure occur (such as
 687                    // OperationCanceledException) we are in an abortive closure and we close Output to allow
 688                    // the peer to continue if it's waiting for us.
 13689                    _controlStream.Output.CompleteOutput(success: false);
 13690                    throw;
 691                }
 692
 693                // Wait for the peer notification that on its side all the streams are completed. It's important to wait
 694                // for this notification before closing the connection. In particular with QUIC where closing the
 695                // connection before all the streams are processed could lead to a stream failure.
 696                try
 66697                {
 698                    // Wait for the _remoteControlStream Input completion.
 66699                    ReadResult readResult = await _remoteControlStream.Input.ReadAsync(cts.Token).ConfigureAwait(false);
 700
 64701                    Debug.Assert(!readResult.IsCanceled);
 702
 64703                    if (!readResult.IsCompleted || !readResult.Buffer.IsEmpty)
 0704                    {
 0705                        throw new IceRpcException(
 0706                            IceRpcError.IceRpcError,
 0707                            "Received bytes on the control stream after receiving the GoAway frame.");
 708                    }
 709
 710                    // We can now safely close the connection.
 64711                    await _transportConnection.CloseAsync(MultiplexedConnectionCloseError.NoError, cts.Token)
 64712                        .ConfigureAwait(false);
 64713                }
 2714                catch (IceRpcException exception) when (exception.IceRpcError == IceRpcError.ConnectionClosedByPeer)
 0715                {
 716                    // Expected if the peer closed the connection first.
 0717                }
 718
 719                // We wait for the completion of the dispatches that we created (and, secondarily, invocations).
 64720                await _dispatchesAndInvocationsCompleted.Task.WaitAsync(cts.Token).ConfigureAwait(false);
 64721            }
 9722            catch (OperationCanceledException)
 9723            {
 9724                cancellationToken.ThrowIfCancellationRequested();
 725
 3726                Debug.Assert(_disposedCts.Token.IsCancellationRequested);
 3727                throw new IceRpcException(
 3728                    IceRpcError.OperationAborted,
 3729                    "The connection shutdown was aborted because the connection was disposed.");
 730            }
 0731            catch (InvalidDataException exception)
 0732            {
 0733                throw new IceRpcException(
 0734                    IceRpcError.IceRpcError,
 0735                    "The connection shutdown was aborted by an icerpc protocol error.",
 0736                    exception);
 737            }
 17738            catch (IceRpcException)
 17739            {
 17740                throw;
 741            }
 0742            catch (Exception exception)
 0743            {
 0744                Debug.Fail($"ShutdownAsync failed with an unexpected exception: {exception}");
 0745                throw;
 746            }
 64747        }
 90748    }
 749
 369750    internal IceRpcProtocolConnection(
 369751        IMultiplexedConnection transportConnection,
 369752        TransportConnectionInformation? transportConnectionInformation,
 369753        ConnectionOptions options,
 369754        ITaskExceptionObserver? taskExceptionObserver)
 369755    {
 369756        _shutdownOrGoAwayCts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token, _goAwayCts.Token);
 757
 369758        _taskExceptionObserver = taskExceptionObserver;
 759
 369760        _transportConnection = transportConnection;
 369761        _dispatcher = options.Dispatcher;
 369762        _maxLocalHeaderSize = options.MaxIceRpcHeaderSize;
 369763        _transportConnectionInformation = transportConnectionInformation;
 764
 369765        if (options.MaxDispatches > 0)
 369766        {
 369767            _dispatchSemaphore = new SemaphoreSlim(
 369768                initialCount: options.MaxDispatches,
 369769                maxCount: options.MaxDispatches);
 369770        }
 771
 369772        _inactivityTimeout = options.InactivityTimeout;
 369773        _inactivityTimeoutTimer = new Timer(_ =>
 5774        {
 5775            bool requestShutdown = false;
 369776
 369777            lock (_mutex)
 5778            {
 5779                if (_shutdownTask is null && _dispatchInvocationCount == 0 && _streamInputOutputCount == 0)
 5780                {
 5781                    requestShutdown = true;
 5782                    RefuseNewInvocations(
 5783                        $"The connection was shut down because it was inactive for over {_inactivityTimeout.TotalSeconds
 5784                }
 5785            }
 369786
 5787            if (requestShutdown)
 5788            {
 369789                // TrySetResult must be called outside the mutex lock
 5790                _shutdownRequestedTcs.TrySetResult();
 5791            }
 374792        });
 369793    }
 794
 795    private static (IDictionary<TKey, ReadOnlySequence<byte>>, PipeReader?) DecodeFieldDictionary<TKey>(
 796        ref SliceDecoder decoder,
 797        DecodeFunc<TKey> decodeKeyFunc) where TKey : struct
 1793798    {
 1793799        int count = decoder.DecodeSize();
 800
 801        IDictionary<TKey, ReadOnlySequence<byte>> fields;
 802        PipeReader? pipeReader;
 1793803        if (count == 0)
 1776804        {
 1776805            fields = ImmutableDictionary<TKey, ReadOnlySequence<byte>>.Empty;
 1776806            pipeReader = null;
 1776807            decoder.CheckEndOfBuffer();
 1776808        }
 809        else
 17810        {
 811            // We don't use the normal collection allocation check here because SizeOf<ReadOnlySequence<byte>> is quite
 812            // large (24).
 813            // For example, say we decode a fields dictionary with a single field with an empty value. It's encoded
 814            // using 1 byte (dictionary size) + 1 byte (key) + 1 byte (value size) = 3 bytes. The decoder's default max
 815            // allocation size is 3 * 8 = 24. If we simply call IncreaseCollectionAllocation(1 * (4 + 24)), we'll exceed
 816            // the default collection allocation limit. (sizeof TKey is currently 4 but could/should increase to 8).
 817
 818            // Each field consumes at least 2 bytes: 1 for the key and one for the value size.
 17819            if (count * 2 > decoder.Remaining)
 0820            {
 0821                throw new InvalidDataException("Too many fields.");
 822            }
 823
 17824            fields = new Dictionary<TKey, ReadOnlySequence<byte>>(count);
 17825            var pipe = new Pipe();
 17826            decoder.CopyTo(pipe.Writer);
 17827            pipe.Writer.Complete();
 828
 829            try
 17830            {
 17831                _ = pipe.Reader.TryRead(out ReadResult readResult);
 17832                var fieldsDecoder = new SliceDecoder(readResult.Buffer, SliceEncoding.Slice2);
 833
 68834                for (int i = 0; i < count; ++i)
 17835                {
 836                    // Decode the field key.
 17837                    TKey key = decodeKeyFunc(ref fieldsDecoder);
 838
 839                    // Decode and check the field value size.
 840                    int valueSize;
 841                    try
 17842                    {
 17843                        valueSize = checked((int)fieldsDecoder.DecodeVarUInt62());
 17844                    }
 0845                    catch (OverflowException exception)
 0846                    {
 0847                        throw new InvalidDataException("The field size can't be larger than int.MaxValue.", exception);
 848                    }
 849
 17850                    if (valueSize > fieldsDecoder.Remaining)
 0851                    {
 0852                        throw new InvalidDataException(
 0853                            $"The value of field '{key}' extends beyond the end of the buffer.");
 854                    }
 855
 856                    // Create a ROS reference to the field value by slicing the fields pipe reader ROS.
 17857                    ReadOnlySequence<byte> value = readResult.Buffer.Slice(fieldsDecoder.Consumed, valueSize);
 17858                    fields.Add(key, value);
 859
 860                    // Skip the field value to prepare the decoder to read the next field value.
 17861                    fieldsDecoder.Skip(valueSize);
 17862                }
 17863                fieldsDecoder.CheckEndOfBuffer();
 864
 17865                pipe.Reader.AdvanceTo(readResult.Buffer.Start); // complete read without consuming anything
 866
 17867                pipeReader = pipe.Reader;
 17868            }
 0869            catch
 0870            {
 0871                pipe.Reader.Complete();
 0872                throw;
 873            }
 17874        }
 875
 876        // The caller is responsible for completing the pipe reader.
 1793877        return (fields, pipeReader);
 1793878    }
 879
 880    private async Task AcceptRequestsAsync(CancellationToken cancellationToken)
 296881    {
 296882        await Task.Yield(); // exit mutex lock
 883
 884        // Wait for _connectTask (which spawned the task running this method) to complete. This way, we won't dispatch
 885        // any request until _connectTask has completed successfully, and indirectly we won't make any invocation until
 886        // _connectTask has completed successfully. The creation of the _acceptRequestsTask is the last action taken by
 887        // _connectTask and as a result this await can't fail.
 296888        await _connectTask!.ConfigureAwait(false);
 889
 890        try
 296891        {
 892            // We check the cancellation token for each iteration because we want to exit the accept requests loop as
 893            // soon as ShutdownAsync/GoAway requests this cancellation, even when more streams can be accepted without
 894            // waiting.
 1717895            while (!cancellationToken.IsCancellationRequested)
 1717896            {
 897                // When _dispatcher is null, the multiplexed connection MaxUnidirectionalStreams and
 898                // MaxBidirectionalStreams options are configured to not accept any request-stream from the peer. As a
 899                // result, when _dispatcher is null, this call will block indefinitely until the cancellation token is
 900                // canceled by ShutdownAsync, GoAway or DisposeAsync.
 1717901                IMultiplexedStream stream = await _transportConnection.AcceptStreamAsync(cancellationToken)
 1717902                    .ConfigureAwait(false);
 903
 904                lock (_mutex)
 1421905                {
 906                    // We don't want to increment _dispatchInvocationCount/_streamInputOutputCount when the connection
 907                    // is shutting down or being disposed.
 1421908                    if (_shutdownTask is not null)
 0909                    {
 910                        // Note that cancellationToken may not be canceled yet at this point.
 0911                        throw new OperationCanceledException();
 912                    }
 913
 914                    // The logic in IncrementStreamInputOutputCount requires that we increment the dispatch-invocation
 915                    // count first.
 1421916                    IncrementDispatchInvocationCount();
 1421917                    IncrementStreamInputOutputCount(stream.IsBidirectional);
 918
 919                    // Decorate the stream to decrement the stream input/output count on Complete.
 1421920                    stream = new MultiplexedStreamDecorator(stream, DecrementStreamInputOutputCount);
 921
 922                    // The multiplexed connection guarantees that the IDs of accepted streams of a given type have ever
 923                    // increasing values.
 924
 1421925                    if (stream.IsBidirectional)
 408926                    {
 408927                        _lastRemoteBidirectionalStreamId = stream.Id;
 408928                    }
 929                    else
 1013930                    {
 1013931                        _lastRemoteUnidirectionalStreamId = stream.Id;
 1013932                    }
 1421933                }
 934
 935                // Start a task to read the stream and dispatch the request. We pass CancellationToken.None to Task.Run
 936                // because DispatchRequestAsync must clean-up the stream and the dispatch-invocation count.
 2842937                _ = Task.Run(() => DispatchRequestAsync(stream), CancellationToken.None);
 1421938            }
 0939        }
 220940        catch (OperationCanceledException)
 220941        {
 942            // Expected, the associated cancellation token source was canceled.
 220943        }
 76944        catch (IceRpcException)
 76945        {
 76946            RefuseNewInvocations("The connection was lost");
 76947            _ = _shutdownRequestedTcs.TrySetResult();
 76948            throw;
 949        }
 0950        catch (Exception exception)
 0951        {
 0952            Debug.Fail($"The accept stream task failed with an unexpected exception: {exception}");
 0953            RefuseNewInvocations("The connection was lost");
 0954            _ = _shutdownRequestedTcs.TrySetResult();
 0955            throw;
 956        }
 220957    }
 958
 959    private void CheckPeerHeaderSize(int headerSize)
 1817960    {
 1817961        if (headerSize > _maxPeerHeaderSize)
 2962        {
 2963            throw new IceRpcException(
 2964                IceRpcError.LimitExceeded,
 2965                $"The header size ({headerSize}) for an icerpc request or response is greater than the peer's max header
 966        }
 1815967    }
 968
 969    private void DecrementDispatchInvocationCount()
 2867970    {
 971        lock (_mutex)
 2867972        {
 2867973            if (--_dispatchInvocationCount == 0)
 542974            {
 542975                if (_shutdownTask is not null)
 31976                {
 31977                    _dispatchesAndInvocationsCompleted.TrySetResult();
 31978                }
 511979                else if (!_refuseInvocations && _streamInputOutputCount == 0)
 402980                {
 402981                    ScheduleInactivityCheck();
 402982                }
 542983            }
 2867984        }
 2867985    }
 986
 987    /// <summary>Decrements the stream input/output count.</summary>
 988    private void DecrementStreamInputOutputCount()
 3660989    {
 990        lock (_mutex)
 3660991        {
 3660992            if (--_streamInputOutputCount == 0)
 531993            {
 531994                if (_shutdownTask is not null)
 30995                {
 30996                    _streamsCompleted.TrySetResult();
 30997                }
 501998                else if (!_refuseInvocations && _dispatchInvocationCount == 0)
 97999                {
 1000                    // We enable the inactivity check in order to complete _shutdownRequestedTcs when inactive for too
 1001                    // long. _refuseInvocations is true when the connection is either about to be "shutdown requested",
 1002                    // or shut down / disposed. We don't need to complete _shutdownRequestedTcs in any of these
 1003                    // situations.
 971004                    ScheduleInactivityCheck();
 971005                }
 5311006            }
 36601007        }
 36601008    }
 1009
 1010    private async Task DispatchRequestAsync(IMultiplexedStream stream)
 14211011    {
 1012        // _disposedCts is not disposed since we own a dispatch count.
 14211013        CancellationToken cancellationToken = stream.IsBidirectional ?
 14211014            stream.WritesClosed.AsCancellationToken(_disposedCts.Token) :
 14211015            _disposedCts.Token;
 1016
 14211017        PipeReader? fieldsPipeReader = null;
 1018        IDictionary<RequestFieldKey, ReadOnlySequence<byte>> fields;
 1019        IceRpcRequestHeader header;
 1020
 14211021        PipeReader? streamInput = stream.Input;
 14211022        PipeWriter? streamOutput = stream.IsBidirectional ? stream.Output : null;
 14211023        bool success = false;
 1024
 1025        try
 14211026        {
 1027            try
 14211028            {
 14211029                ReadResult readResult = await streamInput.ReadSegmentAsync(
 14211030                    SliceEncoding.Slice2,
 14211031                    _maxLocalHeaderSize,
 14211032                    cancellationToken).ConfigureAwait(false);
 1033
 14151034                if (readResult.Buffer.IsEmpty)
 11035                {
 11036                    throw new IceRpcException(IceRpcError.IceRpcError, "Received icerpc request with empty header.");
 1037                }
 1038
 14141039                (header, fields, fieldsPipeReader) = DecodeHeader(readResult.Buffer);
 14141040                streamInput.AdvanceTo(readResult.Buffer.End);
 14141041            }
 31042            catch (InvalidDataException exception)
 31043            {
 31044                var rpcException = new IceRpcException(
 31045                    IceRpcError.IceRpcError,
 31046                    "Received invalid icerpc request header.",
 31047                    exception);
 1048
 31049                if (_taskExceptionObserver is null)
 11050                {
 11051                    throw rpcException;
 1052                }
 1053                else
 21054                {
 21055                    _taskExceptionObserver.DispatchRefused(
 21056                        _connectionContext!.TransportConnectionInformation,
 21057                        rpcException);
 21058                    return; // success remains false
 1059                }
 1060            }
 41061            catch (Exception exception) when (_taskExceptionObserver is not null)
 41062            {
 41063                _taskExceptionObserver.DispatchRefused(_connectionContext!.TransportConnectionInformation, exception);
 41064                return; // success remains false
 1065            }
 1066
 14141067            using var request = new IncomingRequest(Protocol.IceRpc, _connectionContext!)
 14141068            {
 14141069                Fields = fields,
 14141070                IsOneway = !stream.IsBidirectional,
 14141071                Operation = header.Operation,
 14141072                Path = header.Path,
 14141073                Payload = streamInput
 14141074            };
 1075
 14141076            streamInput = null; // the request now owns streamInput
 1077
 1078            try
 14141079            {
 14141080                OutgoingResponse response = await PerformDispatchRequestAsync(request, cancellationToken)
 14141081                    .ConfigureAwait(false);
 1082
 14051083                if (!request.IsOneway)
 3931084                {
 3931085                    Debug.Assert(streamOutput is not null);
 3931086                    EncodeHeader(response);
 1087
 3911088                    PipeWriter payloadWriter = response.GetPayloadWriter(streamOutput);
 1089
 1090                    // We give flushResult an initial "failed" value, in case the first CopyFromAsync throws.
 3911091                    var flushResult = new FlushResult(isCanceled: true, isCompleted: false);
 1092
 1093                    try
 3911094                    {
 1095                        // We don't use cancellationToken here because it's canceled shortly afterwards by the
 1096                        // completion of writesClosed. This works around https://github.com/dotnet/runtime/issues/82704
 1097                        // where the stream would otherwise be aborted after the successful write. It's also fine to
 1098                        // just use _disposedCts.Token: if writes are closed because the peer is not longer interested
 1099                        // in the response, the write operations will raise an IceRpcException(StreamAborted) which is
 1100                        // ignored.
 3911101                        bool hasContinuation = response.PayloadContinuation is not null;
 1102
 3911103                        flushResult = await payloadWriter.CopyFromAsync(
 3911104                            response.Payload,
 3911105                            stream.WritesClosed,
 3911106                            endStream: !hasContinuation,
 3911107                            _disposedCts.Token).ConfigureAwait(false);
 1108
 3861109                        if (!flushResult.IsCompleted && !flushResult.IsCanceled && hasContinuation)
 21110                        {
 21111                            flushResult = await payloadWriter.CopyFromAsync(
 21112                                response.PayloadContinuation!,
 21113                                stream.WritesClosed,
 21114                                endStream: true,
 21115                                _disposedCts.Token).ConfigureAwait(false);
 11116                        }
 3851117                    }
 1118                    finally
 3911119                    {
 3911120                        payloadWriter.CompleteOutput(success: !flushResult.IsCanceled);
 3911121                        response.Payload.Complete();
 3911122                        response.PayloadContinuation?.Complete();
 3911123                    }
 3851124                }
 13971125            }
 171126            catch (Exception exception) when (_taskExceptionObserver is not null)
 71127            {
 71128                _taskExceptionObserver.DispatchFailed(
 71129                    request,
 71130                    _connectionContext!.TransportConnectionInformation,
 71131                    exception);
 71132                return; // success remains false
 1133            }
 13971134            success = true;
 13971135        }
 21136        catch (IceRpcException)
 21137        {
 1138            // Expected, with for example:
 1139            //  - IceRpcError.ConnectionAborted when the peer aborts the connection
 1140            //  - IceRpcError.IceRpcError when the request header is invalid
 1141            //  - IceRpcError.TruncatedData when the request header is truncated
 21142        }
 91143        catch (OperationCanceledException exception) when (
 91144            exception.CancellationToken == cancellationToken ||
 91145            exception.CancellationToken == _disposedCts.Token)
 91146        {
 1147            // Expected if the dispatch is canceled by the peer or the connection is disposed.
 91148        }
 01149        catch (Exception exception)
 01150        {
 1151            // This exception is unexpected when running the IceRPC test suite. A test that expects this exception must
 1152            // install a task exception observer.
 01153            Debug.Fail($"icerpc dispatch failed with an unexpected exception: {exception}");
 1154
 1155            // Generate unobserved task exception (UTE). If this exception is expected (e.g. an expected payload read
 1156            // exception) and the application wants to avoid this UTE, it must configure a non-null logger to install
 1157            // a task exception observer.
 01158            throw;
 1159        }
 1160        finally
 14211161        {
 14211162            if (!success)
 241163            {
 1164                // We always need to complete streamOutput when an exception is thrown. For example, we received an
 1165                // invalid request header that we could not decode.
 241166                streamOutput?.CompleteOutput(success: false);
 241167                streamInput?.Complete();
 241168            }
 14211169            fieldsPipeReader?.Complete();
 1170
 14211171            DecrementDispatchInvocationCount();
 14211172        }
 1173
 1174        async Task<OutgoingResponse> PerformDispatchRequestAsync(
 1175            IncomingRequest request,
 1176            CancellationToken cancellationToken)
 14141177        {
 14141178            Debug.Assert(_dispatcher is not null);
 1179
 1180            OutgoingResponse response;
 1181
 1182            try
 14141183            {
 14141184                if (_dispatchSemaphore is SemaphoreSlim dispatchSemaphore)
 14141185                {
 14141186                    await dispatchSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
 14141187                }
 1188
 1189                try
 14141190                {
 14141191                    response = await _dispatcher.DispatchAsync(request, cancellationToken).ConfigureAwait(false);
 13941192                }
 1193                finally
 14141194                {
 14141195                    _dispatchSemaphore?.Release();
 14141196                }
 1197
 13941198                if (response != request.Response)
 11199                {
 11200                    throw new InvalidOperationException(
 11201                        "The dispatcher did not return the last response created for this request.");
 1202                }
 13931203            }
 101204            catch (OperationCanceledException exception) when (cancellationToken == exception.CancellationToken)
 91205            {
 91206                throw;
 1207            }
 121208            catch (Exception exception)
 121209            {
 121210                if (exception is not DispatchException dispatchException)
 81211                {
 81212                    StatusCode statusCode = exception switch
 81213                    {
 11214                        InvalidDataException => StatusCode.InvalidData,
 41215                        IceRpcException iceRpcException when iceRpcException.IceRpcError == IceRpcError.TruncatedData =>
 41216                            StatusCode.TruncatedPayload,
 31217                        _ => StatusCode.InternalError
 81218                    };
 81219                    dispatchException = new DispatchException(statusCode, message: null, exception);
 81220                }
 121221                response = dispatchException.ToOutgoingResponse(request);
 121222            }
 1223
 14051224            return response;
 14051225        }
 1226
 1227        static (IceRpcRequestHeader, IDictionary<RequestFieldKey, ReadOnlySequence<byte>>, PipeReader?) DecodeHeader(
 1228            ReadOnlySequence<byte> buffer)
 14141229        {
 14141230            var decoder = new SliceDecoder(buffer, SliceEncoding.Slice2);
 14141231            var header = new IceRpcRequestHeader(ref decoder);
 14141232            (IDictionary<RequestFieldKey, ReadOnlySequence<byte>> fields, PipeReader? pipeReader) =
 14141233                DecodeFieldDictionary(
 14141234                    ref decoder,
 14281235                    (ref SliceDecoder decoder) => decoder.DecodeRequestFieldKey());
 1236
 14141237            return (header, fields, pipeReader);
 14141238        }
 1239
 1240        void EncodeHeader(OutgoingResponse response)
 3931241        {
 3931242            var encoder = new SliceEncoder(streamOutput, SliceEncoding.Slice2);
 1243
 1244            // Write the IceRpc response header.
 3931245            Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(_headerSizeLength);
 1246
 1247            // We use UnflushedBytes because EncodeFieldDictionary can write directly to streamOutput.
 3931248            long headerStartPos = streamOutput.UnflushedBytes;
 1249
 3931250            encoder.EncodeStatusCode(response.StatusCode);
 3931251            if (response.StatusCode > StatusCode.Ok)
 331252            {
 331253                encoder.EncodeString(response.ErrorMessage!);
 331254            }
 1255
 3931256            EncodeFieldDictionary(
 3931257                response.Fields,
 51258                (ref SliceEncoder encoder, ResponseFieldKey key) => encoder.EncodeResponseFieldKey(key),
 3931259                ref encoder,
 3931260                streamOutput);
 1261
 1262            // We're done with the header encoding, write the header size.
 3921263            int headerSize = (int)(streamOutput.UnflushedBytes - headerStartPos);
 3921264            CheckPeerHeaderSize(headerSize);
 3911265            SliceEncoder.EncodeVarUInt62((uint)headerSize, sizePlaceholder);
 3911266        }
 14211267    }
 1268
 1269    /// <summary>Encodes the fields dictionary at the end of a request or response header.</summary>
 1270    /// <remarks>This method can write bytes directly to <paramref name="output"/> without going through
 1271    /// <paramref name="encoder"/>.</remarks>
 1272    private void EncodeFieldDictionary<TKey>(
 1273        IDictionary<TKey, OutgoingFieldValue> fields,
 1274        EncodeAction<TKey> encodeKeyAction,
 1275        ref SliceEncoder encoder,
 1276        PipeWriter output) where TKey : struct =>
 18181277        encoder.EncodeDictionary(
 18181278            fields,
 18181279            encodeKeyAction,
 18181280            (ref SliceEncoder encoder, OutgoingFieldValue value) =>
 191281                {
 191282                    if (value.WriteAction is Action<IBufferWriter<byte>> writeAction)
 91283                    {
 91284                        Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(_headerSizeLength);
 91285                        long startPos = output.UnflushedBytes;
 91286                        writeAction(output);
 81287                        SliceEncoder.EncodeVarUInt62((ulong)(output.UnflushedBytes - startPos), sizePlaceholder);
 81288                    }
 18181289                    else
 101290                    {
 101291                        encoder.EncodeSize(checked((int)value.ByteSequence.Length));
 101292                        encoder.WriteByteSequence(value.ByteSequence);
 101293                    }
 18361294                });
 1295
 1296    /// <summary>Increments the dispatch-invocation count.</summary>
 1297    /// <remarks>This method must be called with _mutex locked.</remarks>
 1298    private void IncrementDispatchInvocationCount()
 28671299    {
 28671300        if (_dispatchInvocationCount++ == 0 && _streamInputOutputCount == 0)
 5421301        {
 1302            // Cancel inactivity check.
 5421303            _inactivityTimeoutTimer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
 5421304        }
 28671305    }
 1306
 1307    /// <summary>Increments the stream input/output count.</summary>
 1308    /// <remarks>This method must be called with _mutex locked.</remarks>
 1309    private void IncrementStreamInputOutputCount(bool bidirectional)
 28461310    {
 28461311        Debug.Assert(_dispatchInvocationCount > 0);
 28461312        _streamInputOutputCount += bidirectional ? 2 : 1;
 28461313    }
 1314
 1315    private async Task ReadGoAwayAsync(CancellationToken cancellationToken)
 2961316    {
 2961317        await Task.Yield(); // exit mutex lock
 1318
 1319        // Wait for _connectTask (which spawned the task running this method) to complete. This await can't fail.
 1320        // This guarantees this method won't request a shutdown until after _connectTask completed successfully.
 2961321        await _connectTask!.ConfigureAwait(false);
 1322
 2961323        PipeReader remoteInput = _remoteControlStream!.Input!;
 1324
 1325        try
 2961326        {
 1327            // Wait to receive the GoAway frame.
 2961328            await ReceiveControlFrameHeaderAsync(IceRpcControlFrameType.GoAway, cancellationToken)
 2961329                .ConfigureAwait(false);
 1330
 771331            ReadResult readResult = await remoteInput.ReadSegmentAsync(
 771332                SliceEncoding.Slice2,
 771333                MaxGoAwayFrameBodySize,
 771334                cancellationToken).ConfigureAwait(false);
 1335
 1336            // We don't call CancelPendingRead on remoteInput
 741337            Debug.Assert(!readResult.IsCanceled);
 1338
 1339            try
 741340            {
 741341                _goAwayFrame = SliceEncoding.Slice2.DecodeBuffer(
 741342                    readResult.Buffer,
 1481343                    (ref SliceDecoder decoder) => new IceRpcGoAway(ref decoder));
 731344            }
 1345            finally
 741346            {
 741347                remoteInput.AdvanceTo(readResult.Buffer.End);
 741348            }
 1349
 731350            RefuseNewInvocations("The connection was shut down because it received a GoAway frame from the peer.");
 731351            _goAwayCts.Cancel();
 731352            _ = _shutdownRequestedTcs.TrySetResult();
 731353        }
 1331354        catch (OperationCanceledException)
 1331355        {
 1356            // The connection is disposed and we let this exception cancel the task.
 1331357            throw;
 1358        }
 851359        catch (IceRpcException)
 851360        {
 1361            // We let the task complete with this expected exception.
 851362            throw;
 1363        }
 51364        catch (InvalidDataException exception)
 51365        {
 1366            // "expected" in the sense it should not trigger a Debug.Fail.
 51367            throw new IceRpcException(
 51368                IceRpcError.IceRpcError,
 51369                "The ReadGoAway task was aborted by an icerpc protocol error.",
 51370                exception);
 1371        }
 01372        catch (Exception exception)
 01373        {
 01374            Debug.Fail($"The read go away task failed with an unexpected exception: {exception}");
 01375            throw;
 1376        }
 731377    }
 1378
 1379    private async ValueTask ReceiveControlFrameHeaderAsync(
 1380        IceRpcControlFrameType expectedFrameType,
 1381        CancellationToken cancellationToken)
 6031382    {
 6031383        ReadResult readResult = await _remoteControlStream!.Input.ReadAsync(cancellationToken).ConfigureAwait(false);
 1384
 1385        // We don't call CancelPendingRead on _remoteControlStream.Input.
 3821386        Debug.Assert(!readResult.IsCanceled);
 1387
 3821388        if (readResult.Buffer.IsEmpty)
 11389        {
 11390            throw new InvalidDataException(
 11391                "Failed to read the frame type because no more data is available from the control stream.");
 1392        }
 1393
 3811394        var frameType = (IceRpcControlFrameType)readResult.Buffer.FirstSpan[0];
 3811395        if (frameType != expectedFrameType)
 31396        {
 31397            throw new InvalidDataException($"Received frame type {frameType} but expected {expectedFrameType}.");
 1398        }
 3781399        _remoteControlStream!.Input.AdvanceTo(readResult.Buffer.GetPosition(1));
 3781400    }
 1401
 1402    private async ValueTask ReceiveSettingsFrameBody(CancellationToken cancellationToken)
 3011403    {
 1404        // We are still in the single-threaded initialization at this point.
 1405
 3011406        PipeReader input = _remoteControlStream!.Input;
 3011407        ReadResult readResult = await input.ReadSegmentAsync(
 3011408            SliceEncoding.Slice2,
 3011409            MaxSettingsFrameBodySize,
 3011410            cancellationToken).ConfigureAwait(false);
 1411
 1412        // We don't call CancelPendingRead on _remoteControlStream.Input
 3001413        Debug.Assert(!readResult.IsCanceled);
 1414
 1415        try
 3001416        {
 3001417            IceRpcSettings settings = SliceEncoding.Slice2.DecodeBuffer(
 3001418                readResult.Buffer,
 6001419                (ref SliceDecoder decoder) => new IceRpcSettings(ref decoder));
 1420
 2971421            if (settings.Value.TryGetValue(IceRpcSettingKey.MaxHeaderSize, out ulong value))
 31422            {
 1423                // a varuint62 always fits in a long
 1424                try
 31425                {
 31426                    _maxPeerHeaderSize = ConnectionOptions.IceRpcCheckMaxHeaderSize((long)value);
 21427                }
 11428                catch (ArgumentOutOfRangeException exception)
 11429                {
 11430                    throw new InvalidDataException("Received invalid maximum header size setting.", exception);
 1431                }
 21432                _headerSizeLength = SliceEncoder.GetVarUInt62EncodedSize(value);
 21433            }
 1434            // all other settings are unknown and ignored
 2961435        }
 1436        finally
 3001437        {
 3001438            input.AdvanceTo(readResult.Buffer.End);
 3001439        }
 2961440    }
 1441
 1442    private void RefuseNewInvocations(string message)
 6141443    {
 1444        lock (_mutex)
 6141445        {
 6141446            _refuseInvocations = true;
 6141447            _invocationRefusedMessage ??= message;
 6141448        }
 6141449    }
 1450
 1451    // The inactivity check executes once in _inactivityTimeout. By then either:
 1452    // - the connection is no longer inactive (and the inactivity check is canceled or being canceled)
 1453    // - the connection is still inactive and we request shutdown
 1454    private void ScheduleInactivityCheck() =>
 7951455        _inactivityTimeoutTimer.Change(_inactivityTimeout, Timeout.InfiniteTimeSpan);
 1456
 1457    private ValueTask<FlushResult> SendControlFrameAsync(
 1458        IceRpcControlFrameType frameType,
 1459        EncodeAction encodeAction,
 1460        CancellationToken cancellationToken)
 4081461    {
 4081462        PipeWriter output = _controlStream!.Output;
 1463
 4081464        EncodeFrame(output);
 1465
 4081466        return output.FlushAsync(cancellationToken); // Flush
 1467
 1468        void EncodeFrame(IBufferWriter<byte> buffer)
 4081469        {
 4081470            var encoder = new SliceEncoder(buffer, SliceEncoding.Slice2);
 4081471            encoder.EncodeIceRpcControlFrameType(frameType);
 4081472            Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(_headerSizeLength);
 4081473            int startPos = encoder.EncodedByteCount; // does not include the size
 4081474            encodeAction.Invoke(ref encoder);
 4081475            int frameSize = encoder.EncodedByteCount - startPos;
 4081476            SliceEncoder.EncodeVarUInt62((uint)frameSize, sizePlaceholder);
 4081477        }
 4081478    }
 1479
 1480    /// <summary>Sends the payload continuation of an outgoing request in the background.</summary>
 1481    /// <remarks>We send the payload continuation on a separate thread with Task.Run: this ensures that the synchronous
 1482    /// activity that could result from reading or writing the payload continuation doesn't delay in any way the
 1483    /// caller. </remarks>
 1484    /// <param name="request">The outgoing request.</param>
 1485    /// <param name="payloadWriter">The payload writer.</param>
 1486    /// <param name="writesClosed">A task that completes when we can no longer write to payloadWriter.</param>
 1487    /// <param name="onGoAway">An action to execute with a CTS when we receive the GoAway frame from the peer.</param>
 1488    /// <param name="cancellationToken">The cancellation token of the invocation; the associated CTS is disposed when
 1489    /// the invocation completes.</param>
 1490    private void SendRequestPayloadContinuation(
 1491        OutgoingRequest request,
 1492        PipeWriter payloadWriter,
 1493        Task writesClosed,
 1494        Action<object?> onGoAway,
 1495        CancellationToken cancellationToken)
 121496    {
 121497        Debug.Assert(request.PayloadContinuation is not null);
 1498
 1499        // First "detach" the continuation.
 121500        PipeReader payloadContinuation = request.PayloadContinuation;
 121501        request.PayloadContinuation = null;
 1502
 1503        lock (_mutex)
 121504        {
 121505            Debug.Assert(_dispatchInvocationCount > 0); // as a result, can't be disposed.
 1506
 1507            // Give the task its own dispatch-invocation count. This ensures the transport connection won't be disposed
 1508            // while the continuation is being sent.
 121509            IncrementDispatchInvocationCount();
 121510        }
 1511
 1512        // This background task owns payloadContinuation, payloadWriter and 1 dispatch-invocation count, and must clean
 1513        // them up. Hence CancellationToken.None.
 121514        _ = Task.Run(PerformSendRequestPayloadContinuationAsync, CancellationToken.None);
 1515
 1516        async Task PerformSendRequestPayloadContinuationAsync()
 121517        {
 121518            bool success = false;
 1519
 1520            try
 121521            {
 1522                // Since _dispatchInvocationCount > 0, _disposedCts is not disposed.
 121523                using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _disposedCts.Token);
 1524
 1525                // This token registration is needed for one-way requests and is redundant for two-way requests.
 1526                // We want GoAway to cancel the sending of one-way requests that have not been received by the peer,
 1527                // especially when these requests have payload continuations.
 121528                using CancellationTokenRegistration tokenRegistration = _goAwayCts.Token.UnsafeRegister(onGoAway, cts);
 1529
 1530                try
 121531                {
 1532                    // The cancellation of the InvokeAsync's cancellationToken cancels cts only until InvokeAsync's
 1533                    // PerformInvokeAsync completes. Afterwards, the cancellation of InvokeAsync's cancellationToken has
 1534                    // no effect on cts, so it doesn't cancel the copying of payloadContinuation.
 121535                    FlushResult flushResult = await payloadWriter.CopyFromAsync(
 121536                        payloadContinuation,
 121537                        writesClosed,
 121538                        endStream: true,
 121539                        cts.Token).ConfigureAwait(false);
 1540
 51541                    success = !flushResult.IsCanceled;
 51542                }
 31543                catch (OperationCanceledException exception) when (exception.CancellationToken == cts.Token)
 21544                {
 1545                    // Process/translate this exception primarily for the benefit of _taskExceptionObserver.
 1546
 1547                    // Can be because cancellationToken was canceled by DisposeAsync or GoAway; that's fine.
 21548                    cancellationToken.ThrowIfCancellationRequested();
 1549
 11550                    if (_disposedCts.IsCancellationRequested)
 01551                    {
 1552                        // DisposeAsync aborted the request.
 01553                        throw new IceRpcException(IceRpcError.OperationAborted);
 1554                    }
 1555                    else
 11556                    {
 1557                        // When _goAwayCts is canceled and onGoAway cancels its argument:
 1558                        // - if PerformInvokeAsync is no longer running (typical for a one-way request), we get here
 1559                        // - if PerformInvokeAsync is still running, we may get here or cancellationToken gets canceled
 1560                        // first.
 11561                        Debug.Assert(_goAwayCts.IsCancellationRequested);
 11562                        throw new IceRpcException(IceRpcError.InvocationCanceled, "The connection is shutting down.");
 1563                    }
 1564                }
 51565            }
 71566            catch (Exception exception) when (_taskExceptionObserver is not null)
 51567            {
 51568                _taskExceptionObserver.RequestPayloadContinuationFailed(
 51569                    request,
 51570                    _connectionContext!.TransportConnectionInformation,
 51571                    exception);
 51572            }
 11573            catch (OperationCanceledException exception) when (exception.CancellationToken == cancellationToken)
 11574            {
 1575                // Expected.
 11576            }
 11577            catch (IceRpcException)
 11578            {
 1579                // Expected, with for example IceRpcError.ConnectionAborted when the peer aborts the connection.
 11580            }
 01581            catch (Exception exception)
 01582            {
 1583                // This exception is unexpected when running the IceRPC test suite. A test that expects such an
 1584                // exception must install a task exception observer.
 01585                Debug.Fail($"Failed to send payload continuation of request {request}: {exception}");
 1586
 1587                // If Debug is not enabled and there is no task exception observer, we rethrow to generate an
 1588                // Unobserved Task Exception.
 01589                throw;
 1590            }
 1591            finally
 121592            {
 121593                payloadWriter.CompleteOutput(success);
 121594                payloadContinuation.Complete();
 121595                DecrementDispatchInvocationCount();
 121596            }
 121597        }
 121598    }
 1599}

Methods/Properties

get_IsServer()
.ctor(IceRpc.Transports.IMultiplexedConnection,IceRpc.Transports.TransportConnectionInformation,IceRpc.ConnectionOptions,IceRpc.Internal.ITaskExceptionObserver)
ConnectAsync(System.Threading.CancellationToken)
PerformConnectAsync()
DisposeAsync()
PerformDisposeAsync()
InvokeAsync(IceRpc.OutgoingRequest,System.Threading.CancellationToken)
PerformInvokeAsync()
OnGoAway()
DecodeHeader()
EncodeHeader()
ShutdownAsync(System.Threading.CancellationToken)
PerformShutdownAsync()
DecodeFieldDictionary(ZeroC.Slice.SliceDecoder&,ZeroC.Slice.DecodeFunc`1<TKey>)
AcceptRequestsAsync()
CheckPeerHeaderSize(System.Int32)
DecrementDispatchInvocationCount()
DecrementStreamInputOutputCount()
DispatchRequestAsync()
PerformDispatchRequestAsync()
DecodeHeader()
EncodeHeader()
EncodeFieldDictionary(System.Collections.Generic.IDictionary`2<TKey,IceRpc.OutgoingFieldValue>,ZeroC.Slice.EncodeAction`1<TKey>,ZeroC.Slice.SliceEncoder&,System.IO.Pipelines.PipeWriter)
IncrementDispatchInvocationCount()
IncrementStreamInputOutputCount(System.Boolean)
ReadGoAwayAsync()
ReceiveControlFrameHeaderAsync()
ReceiveSettingsFrameBody()
RefuseNewInvocations(System.String)
ScheduleInactivityCheck()
SendControlFrameAsync(IceRpc.Internal.IceRpcControlFrameType,ZeroC.Slice.EncodeAction,System.Threading.CancellationToken)
EncodeFrame()
SendRequestPayloadContinuation(IceRpc.OutgoingRequest,System.IO.Pipelines.PipeWriter,System.Threading.Tasks.Task,System.Action`1<System.Object>,System.Threading.CancellationToken)
PerformSendRequestPayloadContinuationAsync()