< Summary

Information
Class: IceRpc.Internal.IceRpcProtocolConnection
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Internal/IceRpcProtocolConnection.cs
Tag: 275_13775359185
Line coverage
91%
Covered lines: 867
Uncovered lines: 81
Coverable lines: 948
Total lines: 1599
Line coverage: 91.4%
Branch coverage
89%
Covered branches: 200
Total branches: 224
Branch coverage: 89.2%
Method coverage
100%
Covered methods: 34
Total methods: 34
Method coverage: 100%

Metrics

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Internal/IceRpcProtocolConnection.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Transports;
 4using System.Buffers;
 5using System.Collections.Immutable;
 6using System.Diagnostics;
 7using System.IO.Pipelines;
 8using System.Security.Authentication;
 9using ZeroC.Slice;
 10
 11namespace IceRpc.Internal;
 12
 13internal sealed class IceRpcProtocolConnection : IProtocolConnection
 14{
 15    private const int MaxGoAwayFrameBodySize = 16;
 16    private const int MaxSettingsFrameBodySize = 1024;
 17
 291318    private bool IsServer => _transportConnectionInformation is not null;
 19
 20    private Task? _acceptRequestsTask;
 21
 22    private Task? _connectTask;
 23    private IConnectionContext? _connectionContext; // non-null once the connection is established
 24    private IMultiplexedStream? _controlStream;
 25
 26    // The number of outstanding dispatches and invocations.
 27    // DisposeAsync waits until this count reaches 0 (using _dispatchesAndInvocationsCompleted) before disposing the
 28    // underlying transport connection. So when this count is greater than 0, we know _transportConnection and other
 29    // fields are not disposed.
 30    // _dispatchInvocationCount is also used for the inactivity check: the connection remains active while
 31    // _dispatchInvocationCount > 0 or _streamInputOutputCount > 0.
 32    private int _dispatchInvocationCount;
 33
 34    private readonly SemaphoreSlim? _dispatchSemaphore;
 35
 36    private readonly IDispatcher? _dispatcher;
 71237    private readonly TaskCompletionSource _dispatchesAndInvocationsCompleted =
 71238        new(TaskCreationOptions.RunContinuationsAsynchronously);
 39
 40    private Task? _disposeTask;
 41
 42    // This cancellation token source is canceled when the connection is disposed.
 71243    private readonly CancellationTokenSource _disposedCts = new();
 44
 45    // Canceled when we receive the GoAway frame from the peer.
 71246    private readonly CancellationTokenSource _goAwayCts = new();
 47
 48    // The GoAway frame received from the peer. Read it only after _goAwayCts is canceled.
 49    private IceRpcGoAway _goAwayFrame;
 50
 51    // The number of bytes we need to encode a size up to _maxPeerHeaderSize. It's 2 for DefaultMaxIceRpcHeaderSize.
 71252    private int _headerSizeLength = 2;
 53
 54    private readonly TimeSpan _inactivityTimeout;
 55    private readonly Timer _inactivityTimeoutTimer;
 56    private string? _invocationRefusedMessage;
 57
 58    // The ID of the last bidirectional stream accepted by this connection. It's null as long as no bidirectional stream
 59    // was accepted.
 60    private ulong? _lastRemoteBidirectionalStreamId;
 61
 62    // The ID of the last unidirectional stream accepted by this connection. It's null as long as no unidirectional
 63    // stream (other than _remoteControlStream) was accepted.
 64    private ulong? _lastRemoteUnidirectionalStreamId;
 65
 66    private readonly int _maxLocalHeaderSize;
 71267    private int _maxPeerHeaderSize = ConnectionOptions.DefaultMaxIceRpcHeaderSize;
 68
 71269    private readonly object _mutex = new();
 70
 71    private Task? _readGoAwayTask;
 72
 73    // A connection refuses invocations when it's disposed, shut down, shutting down or merely "shutdown requested".
 74    private bool _refuseInvocations;
 75
 76    private IMultiplexedStream? _remoteControlStream;
 77
 78    private readonly CancellationTokenSource _shutdownOrGoAwayCts;
 79
 80    // The thread that completes this TCS can run the continuations, and as a result its result must be set without
 81    // holding a lock on _mutex.
 71282    private readonly TaskCompletionSource _shutdownRequestedTcs = new();
 83
 84    private Task? _shutdownTask;
 85
 86    // Keeps track of the number of stream Input and Output that are not completed yet.
 87    // It's not the same as the _dispatchInvocationCount: a dispatch or invocation can be completed while the
 88    // application is still reading an incoming frame payload that corresponds to a stream input.
 89    // ShutdownAsync waits for both _streamInputOutputCount and _dispatchInvocationCount to reach 0, while DisposeAsync
 90    // only waits for _dispatchInvocationCount to reach 0.
 91    private int _streamInputOutputCount;
 92
 93    // The streams are completed when _shutdownTask is not null and _streamInputOutputCount is 0.
 71294    private readonly TaskCompletionSource _streamsCompleted = new(TaskCreationOptions.RunContinuationsAsynchronously);
 95
 96    private readonly ITaskExceptionObserver? _taskExceptionObserver;
 97
 98    private readonly IMultiplexedConnection _transportConnection;
 99
 100    // Only set for server connections.
 101    private readonly TransportConnectionInformation? _transportConnectionInformation;
 102
 103    public Task<(TransportConnectionInformation ConnectionInformation, Task ShutdownRequested)> ConnectAsync(
 104        CancellationToken cancellationToken)
 698105    {
 106        Task<(TransportConnectionInformation ConnectionInformation, Task ShutdownRequested)> result;
 107
 698108        lock (_mutex)
 698109        {
 698110            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 111
 694112            if (_connectTask is not null)
 0113            {
 0114                throw new InvalidOperationException("Cannot call connect more than once.");
 115            }
 116
 694117            result = PerformConnectAsync();
 694118            _connectTask = result;
 694119        }
 694120        return result;
 121
 122        async Task<(TransportConnectionInformation ConnectionInformation, Task ShutdownRequested)> PerformConnectAsync()
 694123        {
 124            // Make sure we execute the function without holding the connection mutex lock.
 694125            await Task.Yield();
 126
 127            // _disposedCts is not disposed at this point because DisposeAsync waits for the completion of _connectTask.
 694128            using var connectCts = CancellationTokenSource.CreateLinkedTokenSource(
 694129                cancellationToken,
 694130                _disposedCts.Token);
 131
 132            TransportConnectionInformation transportConnectionInformation;
 133
 134            try
 694135            {
 136                // If the transport connection information is null, we need to connect the transport connection. It's
 137                // null for client connections. The transport connection of a server connection is established by
 138                // Server.
 694139                transportConnectionInformation = _transportConnectionInformation ??
 694140                    await _transportConnection.ConnectAsync(connectCts.Token).ConfigureAwait(false);
 141
 644142                _controlStream = await _transportConnection.CreateStreamAsync(
 644143                    false,
 644144                    connectCts.Token).ConfigureAwait(false);
 145
 633146                var settings = new IceRpcSettings(
 633147                    _maxLocalHeaderSize == ConnectionOptions.DefaultMaxIceRpcHeaderSize ?
 633148                        ImmutableDictionary<IceRpcSettingKey, ulong>.Empty :
 633149                        new Dictionary<IceRpcSettingKey, ulong>
 633150                        {
 633151                            [IceRpcSettingKey.MaxHeaderSize] = (ulong)_maxLocalHeaderSize
 633152                        });
 153
 154                try
 633155                {
 633156                    await SendControlFrameAsync(
 633157                        IceRpcControlFrameType.Settings,
 633158                        settings.Encode,
 633159                        connectCts.Token).ConfigureAwait(false);
 625160                }
 8161                catch
 8162                {
 163                    // If we fail to send the Settings frame, we are in an abortive closure and we close Output to allow
 164                    // the peer to continue if it's waiting for us. This could happen when the cancellation token is
 165                    // canceled.
 8166                    _controlStream!.Output.CompleteOutput(success: false);
 8167                    throw;
 168                }
 169
 170                // Wait for the remote control stream to be accepted and read the protocol Settings frame
 625171                _remoteControlStream = await _transportConnection.AcceptStreamAsync(
 625172                    connectCts.Token).ConfigureAwait(false);
 173
 590174                await ReceiveControlFrameHeaderAsync(
 590175                    IceRpcControlFrameType.Settings,
 590176                    connectCts.Token).ConfigureAwait(false);
 177
 577178                await ReceiveSettingsFrameBody(connectCts.Token).ConfigureAwait(false);
 567179            }
 54180            catch (OperationCanceledException)
 54181            {
 54182                cancellationToken.ThrowIfCancellationRequested();
 183
 15184                Debug.Assert(_disposedCts.Token.IsCancellationRequested);
 15185                throw new IceRpcException(
 15186                    IceRpcError.OperationAborted,
 15187                    "The connection establishment was aborted because the connection was disposed.");
 188            }
 14189            catch (InvalidDataException exception)
 14190            {
 14191                throw new IceRpcException(
 14192                    IceRpcError.ConnectionAborted,
 14193                    "The connection establishment was aborted by an icerpc protocol error.",
 14194                    exception);
 195            }
 2196            catch (AuthenticationException)
 2197            {
 2198                throw;
 199            }
 57200            catch (IceRpcException)
 57201            {
 57202                throw;
 203            }
 0204            catch (Exception exception)
 0205            {
 0206                Debug.Fail($"ConnectAsync failed with an unexpected exception: {exception}");
 0207                throw;
 208            }
 209
 210            // This needs to be set before starting the accept requests task below.
 567211            _connectionContext = new ConnectionContext(this, transportConnectionInformation);
 212
 213            // We assign _readGoAwayTask and _acceptRequestsTask with _mutex locked to make sure this assignment
 214            // occurs before the start of DisposeAsync. Once _disposeTask is not null, _readGoAwayTask etc are
 215            // immutable.
 567216            lock (_mutex)
 567217            {
 567218                if (_disposeTask is not null)
 0219                {
 0220                    throw new IceRpcException(
 0221                        IceRpcError.OperationAborted,
 0222                        "The connection establishment was aborted because the connection was disposed.");
 223                }
 224
 225                // Read the go away frame from the control stream.
 567226                _readGoAwayTask = ReadGoAwayAsync(_disposedCts.Token);
 227
 228                // Start a task that accepts requests (the "accept requests loop")
 567229                _acceptRequestsTask = AcceptRequestsAsync(_shutdownOrGoAwayCts.Token);
 567230            }
 231
 232            // The _acceptRequestsTask waits for this PerformConnectAsync completion before reading anything. As soon as
 233            // it receives a request, it will cancel this inactivity check.
 567234            ScheduleInactivityCheck();
 235
 567236            return (transportConnectionInformation, _shutdownRequestedTcs.Task);
 567237        }
 694238    }
 239
 240    public ValueTask DisposeAsync()
 764241    {
 764242        lock (_mutex)
 764243        {
 764244            if (_disposeTask is null)
 712245            {
 712246                RefuseNewInvocations("The connection was disposed.");
 247
 712248                if (_streamInputOutputCount == 0)
 688249                {
 250                    // That's only for consistency. _streamsCompleted.Task matters only to ShutdownAsync.
 688251                    _streamsCompleted.TrySetResult();
 688252                }
 712253                if (_dispatchInvocationCount == 0)
 695254                {
 695255                    _dispatchesAndInvocationsCompleted.TrySetResult();
 695256                }
 257
 712258                _shutdownTask ??= Task.CompletedTask;
 712259                _disposeTask = PerformDisposeAsync();
 712260            }
 764261        }
 764262        return new(_disposeTask);
 263
 264        async Task PerformDisposeAsync()
 712265        {
 266            // Make sure we execute the code below without holding the mutex lock.
 712267            await Task.Yield();
 268
 712269            _disposedCts.Cancel();
 270
 271            // We don't lock _mutex since once _disposeTask is not null, _connectTask etc are immutable.
 272
 712273            if (_connectTask is not null)
 694274            {
 275                // We wait for _dispatchesAndInvocationsCompleted (since dispatches and invocations are somewhat under
 276                // our control), but not for _streamsCompleted, since we can't make the application complete the
 277                // incoming payload pipe readers.
 278                try
 694279                {
 694280                    await Task.WhenAll(
 694281                        _connectTask,
 694282                        _acceptRequestsTask ?? Task.CompletedTask,
 694283                        _readGoAwayTask ?? Task.CompletedTask,
 694284                        _shutdownTask,
 694285                        _dispatchesAndInvocationsCompleted.Task).ConfigureAwait(false);
 127286                }
 567287                catch
 567288                {
 289                    // Expected if any of these tasks failed or was canceled. Each task takes care of handling
 290                    // unexpected exceptions so there's no need to handle them here.
 567291                }
 694292            }
 293
 294            // If the application is still reading some incoming payload, the disposal of the transport connection can
 295            // abort this reading.
 712296            await _transportConnection.DisposeAsync().ConfigureAwait(false);
 297
 298            // It's safe to complete the output since write operations have been completed by the transport connection
 299            // disposal.
 712300            _controlStream?.Output.Complete();
 301
 302            // It's safe to complete the input since read operations have been completed by the transport connection
 303            // disposal.
 712304            _remoteControlStream?.Input.Complete();
 305
 712306            _dispatchSemaphore?.Dispose();
 712307            _disposedCts.Dispose();
 712308            _goAwayCts.Dispose();
 712309            _shutdownOrGoAwayCts.Dispose();
 310
 712311            await _inactivityTimeoutTimer.DisposeAsync().ConfigureAwait(false);
 712312        }
 764313    }
 314
 315    public Task<IncomingResponse> InvokeAsync(OutgoingRequest request, CancellationToken cancellationToken = default)
 2821316    {
 2821317        if (request.Protocol != Protocol.IceRpc)
 2318        {
 2319            throw new InvalidOperationException(
 2320                $"Cannot send {request.Protocol} request on {Protocol.IceRpc} connection.");
 321        }
 322
 2819323        lock (_mutex)
 2819324        {
 2819325            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 326
 2815327            if (_refuseInvocations)
 2328            {
 2329                throw new IceRpcException(IceRpcError.InvocationRefused, _invocationRefusedMessage);
 330            }
 2813331            if (_connectTask is null)
 0332            {
 0333                throw new InvalidOperationException("Cannot invoke on a connection before connecting it.");
 334            }
 2813335            if (!IsServer && !_connectTask.IsCompletedSuccessfully)
 0336            {
 0337                throw new InvalidOperationException(
 0338                    "Cannot invoke on a client connection that is not fully established.");
 339            }
 340            // It's possible but rare to invoke on a server connection that is still connecting.
 341
 2813342            if (request.ServiceAddress.Fragment.Length > 0)
 0343            {
 0344                throw new NotSupportedException("The icerpc protocol does not support fragments.");
 345            }
 346
 2813347            IncrementDispatchInvocationCount();
 2813348        }
 349
 2813350        return PerformInvokeAsync();
 351
 352        async Task<IncomingResponse> PerformInvokeAsync()
 2813353        {
 354            // Since _dispatchInvocationCount > 0, _disposedCts is not disposed.
 2813355            using var invocationCts = CancellationTokenSource.CreateLinkedTokenSource(
 2813356                cancellationToken,
 2813357                _disposedCts.Token);
 358
 2813359            PipeReader? streamInput = null;
 360
 361            // This try/catch block cleans up streamInput (when not null) and decrements the dispatch-invocation count.
 362            try
 2813363            {
 364                // Create the stream.
 365                IMultiplexedStream stream;
 366                try
 2813367                {
 368                    // We want to cancel CreateStreamAsync as soon as the connection is being shutdown or received a
 369                    // GoAway frame.
 2813370                    using CancellationTokenRegistration _ = _shutdownOrGoAwayCts.Token.UnsafeRegister(
 10371                        cts => ((CancellationTokenSource)cts!).Cancel(),
 2813372                        invocationCts);
 373
 2813374                    stream = await _transportConnection.CreateStreamAsync(
 2813375                        bidirectional: !request.IsOneway,
 2813376                        invocationCts.Token).ConfigureAwait(false);
 377
 2795378                    streamInput = stream.IsBidirectional ? stream.Input : null;
 2795379                }
 16380                catch (OperationCanceledException)
 16381                {
 16382                    cancellationToken.ThrowIfCancellationRequested();
 383
 384                    // Connection was shutdown or disposed and we did not read the payload at all.
 12385                    throw new IceRpcException(IceRpcError.InvocationRefused, _invocationRefusedMessage);
 386                }
 2387                catch (IceRpcException exception)
 2388                {
 2389                    RefuseNewInvocations("The connection was lost.");
 2390                    throw new IceRpcException(IceRpcError.InvocationRefused, _invocationRefusedMessage, exception);
 391                }
 0392                catch (Exception exception)
 0393                {
 0394                    Debug.Fail($"CreateStreamAsync failed with an unexpected exception: {exception}");
 0395                    RefuseNewInvocations("The connection was lost.");
 0396                    throw new IceRpcException(IceRpcError.InvocationRefused, _invocationRefusedMessage, exception);
 397                }
 398
 2795399                using CancellationTokenRegistration tokenRegistration = _goAwayCts.Token.UnsafeRegister(
 2795400                    OnGoAway,
 2795401                    invocationCts);
 402
 403                PipeWriter payloadWriter;
 404
 405                try
 2795406                {
 2795407                    lock (_mutex)
 2795408                    {
 2795409                        if (_refuseInvocations)
 0410                        {
 411                            // Both stream.Output and stream.Output are completed by catch blocks below.
 0412                            throw new IceRpcException(IceRpcError.InvocationRefused, _invocationRefusedMessage);
 413                        }
 414
 2795415                        IncrementStreamInputOutputCount(stream.IsBidirectional);
 416
 417                        // Decorate the stream to decrement the input/output count on Complete.
 2795418                        stream = new MultiplexedStreamDecorator(stream, DecrementStreamInputOutputCount);
 2795419                        streamInput = stream.IsBidirectional ? stream.Input : null;
 2795420                    }
 421
 2795422                    EncodeHeader(stream.Output);
 2793423                    payloadWriter = request.GetPayloadWriter(stream.Output);
 2793424                }
 2425                catch
 2426                {
 2427                    stream.Output.CompleteOutput(success: false);
 2428                    throw;
 429                }
 430
 431                // From now on, we only use payloadWriter to write and we make sure to complete it.
 432
 2793433                bool hasContinuation = request.PayloadContinuation is not null;
 434                FlushResult flushResult;
 435
 436                try
 2793437                {
 2793438                    flushResult = await payloadWriter.CopyFromAsync(
 2793439                        request.Payload,
 2793440                        stream.WritesClosed,
 2793441                        endStream: !hasContinuation,
 2793442                        invocationCts.Token).ConfigureAwait(false);
 2777443                }
 16444                catch
 16445                {
 16446                    payloadWriter.CompleteOutput(success: false);
 16447                    request.PayloadContinuation?.Complete();
 16448                    throw;
 449                }
 450                finally
 2793451                {
 2793452                    request.Payload.Complete();
 2793453                }
 454
 2777455                if (flushResult.IsCompleted || flushResult.IsCanceled || !hasContinuation)
 2753456                {
 457                    // The remote reader doesn't want more data, or the copying was canceled, or there is no
 458                    // continuation: we're done.
 2753459                    payloadWriter.CompleteOutput(!flushResult.IsCanceled);
 2753460                    request.PayloadContinuation?.Complete();
 2753461                }
 462                else
 24463                {
 464                    // Sends the payload continuation in a background thread.
 24465                    SendRequestPayloadContinuation(
 24466                        request,
 24467                        payloadWriter,
 24468                        stream.WritesClosed,
 24469                        OnGoAway,
 24470                        invocationCts.Token);
 24471                }
 472
 2777473                if (request.IsOneway)
 2022474                {
 2022475                    return new IncomingResponse(request, _connectionContext!);
 476                }
 477
 755478                Debug.Assert(streamInput is not null);
 479
 480                try
 755481                {
 755482                    ReadResult readResult = await streamInput.ReadSegmentAsync(
 755483                        SliceEncoding.Slice2,
 755484                        _maxLocalHeaderSize,
 755485                        invocationCts.Token).ConfigureAwait(false);
 486
 487                    // Nothing cancels the stream input pipe reader.
 711488                    Debug.Assert(!readResult.IsCanceled);
 489
 711490                    if (readResult.Buffer.IsEmpty)
 0491                    {
 0492                        throw new IceRpcException(
 0493                            IceRpcError.IceRpcError,
 0494                            "Received an icerpc response with an empty header.");
 495                    }
 496
 711497                    (StatusCode statusCode, string? errorMessage, IDictionary<ResponseFieldKey, ReadOnlySequence<byte>> 
 711498                        DecodeHeader(readResult.Buffer);
 711499                    stream.Input.AdvanceTo(readResult.Buffer.End);
 500
 711501                    if (statusCode == StatusCode.TruncatedPayload && invocationCts.Token.IsCancellationRequested)
 0502                    {
 503                        // Canceling the sending of the payload continuation triggers the completion of the stream
 504                        // output. This may lead to a TruncatedPayload if the dispatch is currently reading the payload
 505                        // continuation. In such cases, we prioritize throwing an OperationCanceledException.
 0506                        fieldsPipeReader?.Complete();
 0507                        invocationCts.Token.ThrowIfCancellationRequested();
 0508                    }
 509
 711510                    var response = new IncomingResponse(
 711511                        request,
 711512                        _connectionContext!,
 711513                        statusCode,
 711514                        errorMessage,
 711515                        fields,
 711516                        fieldsPipeReader)
 711517                    {
 711518                        Payload = streamInput
 711519                    };
 520
 711521                    streamInput = null; // response now owns the stream input
 711522                    return response;
 523                }
 2524                catch (InvalidDataException exception)
 2525                {
 2526                    throw new IceRpcException(
 2527                        IceRpcError.IceRpcError,
 2528                        "Received an icerpc response with an invalid header.",
 2529                        exception);
 530                }
 531
 532                void OnGoAway(object? cts)
 26533                {
 26534                    if (!stream.IsStarted ||
 26535                        stream.Id >=
 26536                            (stream.IsBidirectional ?
 26537                                _goAwayFrame.BidirectionalStreamId :
 26538                                _goAwayFrame.UnidirectionalStreamId))
 8539                    {
 540                        // The request wasn't received by the peer so it's safe to cancel the invocation.
 8541                        ((CancellationTokenSource)cts!).Cancel();
 8542                    }
 26543                }
 544            }
 32545            catch (OperationCanceledException exception) when (exception.CancellationToken == invocationCts.Token)
 26546            {
 26547                cancellationToken.ThrowIfCancellationRequested();
 548
 12549                if (_disposedCts.IsCancellationRequested)
 6550                {
 551                    // DisposeAsync aborted the request.
 6552                    throw new IceRpcException(IceRpcError.OperationAborted);
 553                }
 554                else
 6555                {
 6556                    Debug.Assert(_goAwayCts.IsCancellationRequested);
 6557                    throw new IceRpcException(IceRpcError.InvocationCanceled, "The connection is shutting down.");
 558                }
 559            }
 560            finally
 2813561            {
 2813562                streamInput?.Complete();
 2813563                DecrementDispatchInvocationCount();
 2813564            }
 565
 566            static (StatusCode StatusCode, string? ErrorMessage, IDictionary<ResponseFieldKey, ReadOnlySequence<byte>>, 
 567                ReadOnlySequence<byte> buffer)
 711568            {
 711569                var decoder = new SliceDecoder(buffer, SliceEncoding.Slice2);
 570
 711571                StatusCode statusCode = decoder.DecodeStatusCode();
 711572                string? errorMessage = statusCode == StatusCode.Ok ? null : decoder.DecodeString();
 573
 711574                (IDictionary<ResponseFieldKey, ReadOnlySequence<byte>> fields, PipeReader? pipeReader) =
 711575                    DecodeFieldDictionary(
 711576                        ref decoder,
 715577                        (ref SliceDecoder decoder) => decoder.DecodeResponseFieldKey());
 578
 711579                return (statusCode, errorMessage, fields, pipeReader);
 711580            }
 581
 582            void EncodeHeader(PipeWriter streamOutput)
 2795583            {
 2795584                var encoder = new SliceEncoder(streamOutput, SliceEncoding.Slice2);
 585
 586                // Write the IceRpc request header.
 2795587                Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(_headerSizeLength);
 588
 589                // We use UnflushedBytes because EncodeFieldDictionary can write directly to streamOutput.
 2795590                long headerStartPos = streamOutput.UnflushedBytes;
 591
 2795592                var header = new IceRpcRequestHeader(request.ServiceAddress.Path, request.Operation);
 593
 2795594                header.Encode(ref encoder);
 595
 2795596                EncodeFieldDictionary(
 2795597                    request.Fields,
 15598                    (ref SliceEncoder encoder, RequestFieldKey key) => encoder.EncodeRequestFieldKey(key),
 2795599                    ref encoder,
 2795600                    streamOutput);
 601
 602                // We're done with the header encoding, write the header size.
 2795603                int headerSize = (int)(streamOutput.UnflushedBytes - headerStartPos);
 2795604                CheckPeerHeaderSize(headerSize);
 2793605                SliceEncoder.EncodeVarUInt62((uint)headerSize, sizePlaceholder);
 2793606            }
 2733607        }
 2813608    }
 609
 610    public Task ShutdownAsync(CancellationToken cancellationToken = default)
 181611    {
 181612        lock (_mutex)
 181613        {
 181614            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 615
 174616            if (_shutdownTask is not null)
 4617            {
 4618                throw new InvalidOperationException("Cannot call ShutdownAsync more than once.");
 619            }
 170620            if (_connectTask is null || !_connectTask.IsCompletedSuccessfully)
 6621            {
 6622                throw new InvalidOperationException("Cannot shut down a protocol connection before it's connected.");
 623            }
 624
 164625            RefuseNewInvocations("The connection was shut down.");
 626
 164627            if (_streamInputOutputCount == 0)
 119628            {
 119629                _streamsCompleted.TrySetResult();
 119630            }
 164631            if (_dispatchInvocationCount == 0)
 115632            {
 115633                _dispatchesAndInvocationsCompleted.TrySetResult();
 115634            }
 635
 164636            _shutdownTask = PerformShutdownAsync();
 164637        }
 164638        return _shutdownTask;
 639
 640        async Task PerformShutdownAsync()
 164641        {
 164642            await Task.Yield(); // exit mutex lock
 643
 164644            _shutdownOrGoAwayCts.Cancel();
 645
 646            try
 164647            {
 164648                Debug.Assert(_acceptRequestsTask is not null);
 164649                Debug.Assert(_controlStream is not null);
 164650                Debug.Assert(_readGoAwayTask is not null);
 164651                Debug.Assert(_remoteControlStream is not null);
 652
 164653                await _acceptRequestsTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 654
 146655                using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _disposedCts.Token);
 656
 657                // Once shutdownTask is not null, _lastRemoteBidirectionalStreamId and _lastRemoteUnidirectionalStreamId
 658                // are immutable.
 659
 660                // When this peer is the server endpoint, the first accepted bidirectional stream ID is 0. When this
 661                // peer is the client endpoint, the first accepted bidirectional stream ID is 1.
 146662                IceRpcGoAway goAwayFrame = new(
 146663                    _lastRemoteBidirectionalStreamId is ulong value ? value + 4 : (IsServer ? 0ul : 1ul),
 146664                    (_lastRemoteUnidirectionalStreamId ?? _remoteControlStream.Id) + 4);
 665
 666                try
 146667                {
 146668                    _ = await SendControlFrameAsync(
 146669                        IceRpcControlFrameType.GoAway,
 146670                        goAwayFrame.Encode,
 146671                        cts.Token).ConfigureAwait(false);
 672
 673                    // Wait for the peer to send back a GoAway frame. The task should already be completed if the
 674                    // shutdown was initiated by the peer.
 142675                    await _readGoAwayTask.WaitAsync(cts.Token).ConfigureAwait(false);
 676
 677                    // Wait for all streams (other than the control streams) to have their Input and Output completed.
 122678                    await _streamsCompleted.Task.WaitAsync(cts.Token).ConfigureAwait(false);
 679
 680                    // Close the control stream to notify the peer that on our side, all the streams completed and that
 681                    // it can close the transport connection whenever it likes.
 120682                    _controlStream.Output.CompleteOutput(success: true);
 120683                }
 26684                catch
 26685                {
 686                    // If we fail to send the GoAway frame or some other failure occur (such as
 687                    // OperationCanceledException) we are in an abortive closure and we close Output to allow
 688                    // the peer to continue if it's waiting for us.
 26689                    _controlStream.Output.CompleteOutput(success: false);
 26690                    throw;
 691                }
 692
 693                // Wait for the peer notification that on its side all the streams are completed. It's important to wait
 694                // for this notification before closing the connection. In particular with Quic where closing the
 695                // connection before all the streams are processed could lead to a stream failure.
 696                try
 120697                {
 698                    // Wait for the _remoteControlStream Input completion.
 120699                    ReadResult readResult = await _remoteControlStream.Input.ReadAsync(cts.Token).ConfigureAwait(false);
 700
 115701                    Debug.Assert(!readResult.IsCanceled);
 702
 115703                    if (!readResult.IsCompleted || !readResult.Buffer.IsEmpty)
 0704                    {
 0705                        throw new IceRpcException(
 0706                            IceRpcError.IceRpcError,
 0707                            "Received bytes on the control stream after receiving the GoAway frame.");
 708                    }
 709
 710                    // We can now safely close the connection.
 115711                    await _transportConnection.CloseAsync(MultiplexedConnectionCloseError.NoError, cts.Token)
 115712                        .ConfigureAwait(false);
 114713                }
 5714                catch (IceRpcException exception) when (exception.IceRpcError == IceRpcError.ConnectionClosedByPeer)
 1715                {
 716                    // Expected if the peer closed the connection first.
 1717                }
 718
 719                // We wait for the completion of the dispatches that we created (and, secondarily, invocations).
 115720                await _dispatchesAndInvocationsCompleted.Task.WaitAsync(cts.Token).ConfigureAwait(false);
 115721            }
 17722            catch (OperationCanceledException)
 17723            {
 17724                cancellationToken.ThrowIfCancellationRequested();
 725
 7726                Debug.Assert(_disposedCts.Token.IsCancellationRequested);
 7727                throw new IceRpcException(
 7728                    IceRpcError.OperationAborted,
 7729                    "The connection shutdown was aborted because the connection was disposed.");
 730            }
 0731            catch (InvalidDataException exception)
 0732            {
 0733                throw new IceRpcException(
 0734                    IceRpcError.IceRpcError,
 0735                    "The connection shutdown was aborted by an icerpc protocol error.",
 0736                    exception);
 737            }
 32738            catch (IceRpcException)
 32739            {
 32740                throw;
 741            }
 0742            catch (Exception exception)
 0743            {
 0744                Debug.Fail($"ShutdownAsync failed with an unexpected exception: {exception}");
 0745                throw;
 746            }
 115747        }
 164748    }
 749
 712750    internal IceRpcProtocolConnection(
 712751        IMultiplexedConnection transportConnection,
 712752        TransportConnectionInformation? transportConnectionInformation,
 712753        ConnectionOptions options,
 712754        ITaskExceptionObserver? taskExceptionObserver)
 712755    {
 712756        _shutdownOrGoAwayCts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token, _goAwayCts.Token);
 757
 712758        _taskExceptionObserver = taskExceptionObserver;
 759
 712760        _transportConnection = transportConnection;
 712761        _dispatcher = options.Dispatcher;
 712762        _maxLocalHeaderSize = options.MaxIceRpcHeaderSize;
 712763        _transportConnectionInformation = transportConnectionInformation;
 764
 712765        if (options.MaxDispatches > 0)
 712766        {
 712767            _dispatchSemaphore = new SemaphoreSlim(
 712768                initialCount: options.MaxDispatches,
 712769                maxCount: options.MaxDispatches);
 712770        }
 771
 712772        _inactivityTimeout = options.InactivityTimeout;
 712773        _inactivityTimeoutTimer = new Timer(_ =>
 10774        {
 10775            bool requestShutdown = false;
 712776
 10777            lock (_mutex)
 10778            {
 10779                if (_shutdownTask is null && _dispatchInvocationCount == 0 && _streamInputOutputCount == 0)
 10780                {
 10781                    requestShutdown = true;
 10782                    RefuseNewInvocations(
 10783                        $"The connection was shut down because it was inactive for over {_inactivityTimeout.TotalSeconds
 10784                }
 10785            }
 712786
 10787            if (requestShutdown)
 10788            {
 712789                // TrySetResult must be called outside the mutex lock
 10790                _shutdownRequestedTcs.TrySetResult();
 10791            }
 722792        });
 712793    }
 794
 795    private static (IDictionary<TKey, ReadOnlySequence<byte>>, PipeReader?) DecodeFieldDictionary<TKey>(
 796        ref SliceDecoder decoder,
 797        DecodeFunc<TKey> decodeKeyFunc) where TKey : struct
 3480798    {
 3480799        int count = decoder.DecodeSize();
 800
 801        IDictionary<TKey, ReadOnlySequence<byte>> fields;
 802        PipeReader? pipeReader;
 3480803        if (count == 0)
 3461804        {
 3461805            fields = ImmutableDictionary<TKey, ReadOnlySequence<byte>>.Empty;
 3461806            pipeReader = null;
 3461807            decoder.CheckEndOfBuffer();
 3461808        }
 809        else
 19810        {
 811            // We don't use the normal collection allocation check here because SizeOf<ReadOnlySequence<byte>> is quite
 812            // large (24).
 813            // For example, say we decode a fields dictionary with a single field with an empty value. It's encoded
 814            // using 1 byte (dictionary size) + 1 byte (key) + 1 byte (value size) = 3 bytes. The decoder's default max
 815            // allocation size is 3 * 8 = 24. If we simply call IncreaseCollectionAllocation(1 * (4 + 24)), we'll exceed
 816            // the default collection allocation limit. (sizeof TKey is currently 4 but could/should increase to 8).
 817
 818            // Each field consumes at least 2 bytes: 1 for the key and one for the value size.
 19819            if (count * 2 > decoder.Remaining)
 0820            {
 0821                throw new InvalidDataException("Too many fields.");
 822            }
 823
 19824            fields = new Dictionary<TKey, ReadOnlySequence<byte>>(count);
 19825            var pipe = new Pipe();
 19826            decoder.CopyTo(pipe.Writer);
 19827            pipe.Writer.Complete();
 828
 829            try
 19830            {
 19831                _ = pipe.Reader.TryRead(out ReadResult readResult);
 19832                var fieldsDecoder = new SliceDecoder(readResult.Buffer, SliceEncoding.Slice2);
 833
 76834                for (int i = 0; i < count; ++i)
 19835                {
 836                    // Decode the field key.
 19837                    TKey key = decodeKeyFunc(ref fieldsDecoder);
 838
 839                    // Decode and check the field value size.
 840                    int valueSize;
 841                    try
 19842                    {
 19843                        valueSize = checked((int)fieldsDecoder.DecodeVarUInt62());
 19844                    }
 0845                    catch (OverflowException exception)
 0846                    {
 0847                        throw new InvalidDataException("The field size can't be larger than int.MaxValue.", exception);
 848                    }
 849
 19850                    if (valueSize > fieldsDecoder.Remaining)
 0851                    {
 0852                        throw new InvalidDataException(
 0853                            $"The value of field '{key}' extends beyond the end of the buffer.");
 854                    }
 855
 856                    // Create a ROS reference to the field value by slicing the fields pipe reader ROS.
 19857                    ReadOnlySequence<byte> value = readResult.Buffer.Slice(fieldsDecoder.Consumed, valueSize);
 19858                    fields.Add(key, value);
 859
 860                    // Skip the field value to prepare the decoder to read the next field value.
 19861                    fieldsDecoder.Skip(valueSize);
 19862                }
 19863                fieldsDecoder.CheckEndOfBuffer();
 864
 19865                pipe.Reader.AdvanceTo(readResult.Buffer.Start); // complete read without consuming anything
 866
 19867                pipeReader = pipe.Reader;
 19868            }
 0869            catch
 0870            {
 0871                pipe.Reader.Complete();
 0872                throw;
 873            }
 19874        }
 875
 876        // The caller is responsible for completing the pipe reader.
 3480877        return (fields, pipeReader);
 3480878    }
 879
 880    private async Task AcceptRequestsAsync(CancellationToken cancellationToken)
 567881    {
 567882        await Task.Yield(); // exit mutex lock
 883
 884        // Wait for _connectTask (which spawned the task running this method) to complete. This way, we won't dispatch
 885        // any request until _connectTask has completed successfully, and indirectly we won't make any invocation until
 886        // _connectTask has completed successfully. The creation of the _acceptRequestsTask is the last action taken by
 887        // _connectTask and as a result this await can't fail.
 567888        await _connectTask!.ConfigureAwait(false);
 889
 890        try
 567891        {
 892            // We check the cancellation token for each iteration because we want to exit the accept requests loop as
 893            // soon as ShutdownAsync/GoAway requests this cancellation, even when more streams can be accepted without
 894            // waiting.
 3354895            while (!cancellationToken.IsCancellationRequested)
 3354896            {
 897                // When _dispatcher is null, the multiplexed connection MaxUnidirectionalStreams and
 898                // MaxBidirectionalStreams options are configured to not accept any request-stream from the peer. As a
 899                // result, when _dispatcher is null, this call will block indefinitely until the cancellation token is
 900                // canceled by ShutdownAsync, GoAway or DisposeAsync.
 3354901                IMultiplexedStream stream = await _transportConnection.AcceptStreamAsync(cancellationToken)
 3354902                    .ConfigureAwait(false);
 903
 2787904                lock (_mutex)
 2787905                {
 906                    // We don't want to increment _dispatchInvocationCount/_streamInputOutputCount when the connection
 907                    // is shutting down or being disposed.
 2787908                    if (_shutdownTask is not null)
 0909                    {
 910                        // Note that cancellationToken may not be canceled yet at this point.
 0911                        throw new OperationCanceledException();
 912                    }
 913
 914                    // The logic in IncrementStreamInputOutputCount requires that we increment the dispatch-invocation
 915                    // count first.
 2787916                    IncrementDispatchInvocationCount();
 2787917                    IncrementStreamInputOutputCount(stream.IsBidirectional);
 918
 919                    // Decorate the stream to decrement the stream input/output count on Complete.
 2787920                    stream = new MultiplexedStreamDecorator(stream, DecrementStreamInputOutputCount);
 921
 922                    // The multiplexed connection guarantees that the IDs of accepted streams of a given type have ever
 923                    // increasing values.
 924
 2787925                    if (stream.IsBidirectional)
 767926                    {
 767927                        _lastRemoteBidirectionalStreamId = stream.Id;
 767928                    }
 929                    else
 2020930                    {
 2020931                        _lastRemoteUnidirectionalStreamId = stream.Id;
 2020932                    }
 2787933                }
 934
 935                // Start a task to read the stream and dispatch the request. We pass CancellationToken.None to Task.Run
 936                // because DispatchRequestAsync must clean-up the stream and the dispatch-invocation count.
 5574937                _ = Task.Run(() => DispatchRequestAsync(stream), CancellationToken.None);
 2787938            }
 0939        }
 415940        catch (OperationCanceledException)
 415941        {
 942            // Expected, the associated cancellation token source was canceled.
 415943        }
 152944        catch (IceRpcException)
 152945        {
 152946            RefuseNewInvocations("The connection was lost");
 152947            _ = _shutdownRequestedTcs.TrySetResult();
 152948            throw;
 949        }
 0950        catch (Exception exception)
 0951        {
 0952            Debug.Fail($"The accept stream task failed with an unexpected exception: {exception}");
 0953            RefuseNewInvocations("The connection was lost");
 0954            _ = _shutdownRequestedTcs.TrySetResult();
 0955            throw;
 956        }
 415957    }
 958
 959    private void CheckPeerHeaderSize(int headerSize)
 3528960    {
 3528961        if (headerSize > _maxPeerHeaderSize)
 4962        {
 4963            throw new IceRpcException(
 4964                IceRpcError.LimitExceeded,
 4965                $"The header size ({headerSize}) for an icerpc request or response is greater than the peer's max header
 966        }
 3524967    }
 968
 969    private void DecrementDispatchInvocationCount()
 5624970    {
 5624971        lock (_mutex)
 5624972        {
 5624973            if (--_dispatchInvocationCount == 0)
 844974            {
 844975                if (_shutdownTask is not null)
 60976                {
 60977                    _dispatchesAndInvocationsCompleted.TrySetResult();
 60978                }
 784979                else if (!_refuseInvocations && _streamInputOutputCount == 0)
 616980                {
 616981                    ScheduleInactivityCheck();
 616982                }
 844983            }
 5624984        }
 5624985    }
 986
 987    /// <summary>Decrements the stream input/output count.</summary>
 988    private void DecrementStreamInputOutputCount()
 7112989    {
 7112990        lock (_mutex)
 7112991        {
 7112992            if (--_streamInputOutputCount == 0)
 825993            {
 825994                if (_shutdownTask is not null)
 55995                {
 55996                    _streamsCompleted.TrySetResult();
 55997                }
 770998                else if (!_refuseInvocations && _dispatchInvocationCount == 0)
 147999                {
 1000                    // We enable the inactivity check in order to complete _shutdownRequestedTcs when inactive for too
 1001                    // long. _refuseInvocations is true when the connection is either about to be "shutdown requested",
 1002                    // or shut down / disposed. We don't need to complete _shutdownRequestedTcs in any of these
 1003                    // situations.
 1471004                    ScheduleInactivityCheck();
 1471005                }
 8251006            }
 71121007        }
 71121008    }
 1009
 1010    private async Task DispatchRequestAsync(IMultiplexedStream stream)
 27871011    {
 1012        // _disposedCts is not disposed since we own a dispatch count.
 27871013        CancellationToken cancellationToken = stream.IsBidirectional ?
 27871014            stream.WritesClosed.AsCancellationToken(_disposedCts.Token) :
 27871015            _disposedCts.Token;
 1016
 27871017        PipeReader? fieldsPipeReader = null;
 1018        IDictionary<RequestFieldKey, ReadOnlySequence<byte>> fields;
 1019        IceRpcRequestHeader header;
 1020
 27871021        PipeReader? streamInput = stream.Input;
 27871022        PipeWriter? streamOutput = stream.IsBidirectional ? stream.Output : null;
 27871023        bool success = false;
 1024
 1025        try
 27871026        {
 1027            try
 27871028            {
 27871029                ReadResult readResult = await streamInput.ReadSegmentAsync(
 27871030                    SliceEncoding.Slice2,
 27871031                    _maxLocalHeaderSize,
 27871032                    cancellationToken).ConfigureAwait(false);
 1033
 27711034                if (readResult.Buffer.IsEmpty)
 21035                {
 21036                    throw new IceRpcException(IceRpcError.IceRpcError, "Received icerpc request with empty header.");
 1037                }
 1038
 27691039                (header, fields, fieldsPipeReader) = DecodeHeader(readResult.Buffer);
 27691040                streamInput.AdvanceTo(readResult.Buffer.End);
 27691041            }
 61042            catch (InvalidDataException exception)
 61043            {
 61044                var rpcException = new IceRpcException(
 61045                    IceRpcError.IceRpcError,
 61046                    "Received invalid icerpc request header.",
 61047                    exception);
 1048
 61049                if (_taskExceptionObserver is null)
 21050                {
 21051                    throw rpcException;
 1052                }
 1053                else
 41054                {
 41055                    _taskExceptionObserver.DispatchRefused(
 41056                        _connectionContext!.TransportConnectionInformation,
 41057                        rpcException);
 41058                    return; // success remains false
 1059                }
 1060            }
 121061            catch (Exception exception) when (_taskExceptionObserver is not null)
 81062            {
 81063                _taskExceptionObserver.DispatchRefused(_connectionContext!.TransportConnectionInformation, exception);
 81064                return; // success remains false
 1065            }
 1066
 27691067            using var request = new IncomingRequest(Protocol.IceRpc, _connectionContext!)
 27691068            {
 27691069                Fields = fields,
 27691070                IsOneway = !stream.IsBidirectional,
 27691071                Operation = header.Operation,
 27691072                Path = header.Path,
 27691073                Payload = streamInput
 27691074            };
 1075
 27691076            streamInput = null; // the request now owns streamInput
 1077
 1078            try
 27691079            {
 27691080                OutgoingResponse response = await PerformDispatchRequestAsync(request, cancellationToken)
 27691081                    .ConfigureAwait(false);
 1082
 27531083                if (!request.IsOneway)
 7351084                {
 7351085                    Debug.Assert(streamOutput is not null);
 7351086                    EncodeHeader(response);
 1087
 7311088                    PipeWriter payloadWriter = response.GetPayloadWriter(streamOutput);
 1089
 1090                    // We give flushResult an initial "failed" value, in case the first CopyFromAsync throws.
 7311091                    var flushResult = new FlushResult(isCanceled: true, isCompleted: false);
 1092
 1093                    try
 7311094                    {
 1095                        // We don't use cancellationToken here because it's canceled shortly afterwards by the
 1096                        // completion of writesClosed. This works around https://github.com/dotnet/runtime/issues/82704
 1097                        // where the stream would otherwise be aborted after the successful write. It's also fine to
 1098                        // just use _disposedCts.Token: if writes are closed because the peer is not longer interested
 1099                        // in the response, the write operations will raise an IceRpcException(StreamAborted) which is
 1100                        // ignored.
 7311101                        bool hasContinuation = response.PayloadContinuation is not null;
 1102
 7311103                        flushResult = await payloadWriter.CopyFromAsync(
 7311104                            response.Payload,
 7311105                            stream.WritesClosed,
 7311106                            endStream: !hasContinuation,
 7311107                            _disposedCts.Token).ConfigureAwait(false);
 1108
 7221109                        if (!flushResult.IsCompleted && !flushResult.IsCanceled && hasContinuation)
 41110                        {
 41111                            flushResult = await payloadWriter.CopyFromAsync(
 41112                                response.PayloadContinuation!,
 41113                                stream.WritesClosed,
 41114                                endStream: true,
 41115                                _disposedCts.Token).ConfigureAwait(false);
 21116                        }
 7201117                    }
 1118                    finally
 7311119                    {
 7311120                        payloadWriter.CompleteOutput(success: !flushResult.IsCanceled);
 7311121                        response.Payload.Complete();
 7311122                        response.PayloadContinuation?.Complete();
 7311123                    }
 7201124                }
 27381125            }
 311126            catch (Exception exception) when (_taskExceptionObserver is not null)
 151127            {
 151128                _taskExceptionObserver.DispatchFailed(
 151129                    request,
 151130                    _connectionContext!.TransportConnectionInformation,
 151131                    exception);
 151132                return; // success remains false
 1133            }
 27381134            success = true;
 27381135        }
 21136        catch (IceRpcException)
 21137        {
 1138            // Expected, with for example:
 1139            //  - IceRpcError.ConnectionAborted when the peer aborts the connection
 1140            //  - IceRpcError.IceRpcError when the request header is invalid
 1141            //  - IceRpcError.TruncatedData when the request header is truncated
 21142        }
 201143        catch (OperationCanceledException exception) when (
 201144            exception.CancellationToken == cancellationToken ||
 201145            exception.CancellationToken == _disposedCts.Token)
 201146        {
 1147            // Expected if the dispatch is canceled by the peer or the connection is disposed.
 201148        }
 01149        catch (Exception exception)
 01150        {
 1151            // This exception is unexpected when running the IceRPC test suite. A test that expects this exception must
 1152            // install a task exception observer.
 01153            Debug.Fail($"icerpc dispatch failed with an unexpected exception: {exception}");
 1154
 1155            // Generate unobserved task exception (UTE). If this exception is expected (e.g. an expected payload read
 1156            // exception) and the application wants to avoid this UTE, it must configure a non-null logger to install
 1157            // a task exception observer.
 01158            throw;
 1159        }
 1160        finally
 27871161        {
 27871162            if (!success)
 491163            {
 1164                // We always need to complete streamOutput when an exception is thrown. For example, we received an
 1165                // invalid request header that we could not decode.
 491166                streamOutput?.CompleteOutput(success: false);
 491167                streamInput?.Complete();
 491168            }
 27871169            fieldsPipeReader?.Complete();
 1170
 27871171            DecrementDispatchInvocationCount();
 27871172        }
 1173
 1174        async Task<OutgoingResponse> PerformDispatchRequestAsync(
 1175            IncomingRequest request,
 1176            CancellationToken cancellationToken)
 27691177        {
 27691178            Debug.Assert(_dispatcher is not null);
 1179
 1180            OutgoingResponse response;
 1181
 1182            try
 27691183            {
 27691184                if (_dispatchSemaphore is SemaphoreSlim dispatchSemaphore)
 27691185                {
 27691186                    await dispatchSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
 27691187                }
 1188
 1189                try
 27691190                {
 27691191                    response = await _dispatcher.DispatchAsync(request, cancellationToken).ConfigureAwait(false);
 27351192                }
 1193                finally
 27691194                {
 27691195                    _dispatchSemaphore?.Release();
 27691196                }
 1197
 27351198                if (response != request.Response)
 21199                {
 21200                    throw new InvalidOperationException(
 21201                        "The dispatcher did not return the last response created for this request.");
 1202                }
 27331203            }
 181204            catch (OperationCanceledException exception) when (cancellationToken == exception.CancellationToken)
 161205            {
 161206                throw;
 1207            }
 201208            catch (Exception exception)
 201209            {
 201210                if (exception is not DispatchException dispatchException)
 161211                {
 161212                    StatusCode statusCode = exception switch
 161213                    {
 21214                        InvalidDataException => StatusCode.InvalidData,
 81215                        IceRpcException iceRpcException when iceRpcException.IceRpcError == IceRpcError.TruncatedData =>
 81216                            StatusCode.TruncatedPayload,
 61217                        _ => StatusCode.InternalError
 161218                    };
 161219                    dispatchException = new DispatchException(statusCode, message: null, exception);
 161220                }
 201221                response = dispatchException.ToOutgoingResponse(request);
 201222            }
 1223
 27531224            return response;
 27531225        }
 1226
 1227        static (IceRpcRequestHeader, IDictionary<RequestFieldKey, ReadOnlySequence<byte>>, PipeReader?) DecodeHeader(
 1228            ReadOnlySequence<byte> buffer)
 27691229        {
 27691230            var decoder = new SliceDecoder(buffer, SliceEncoding.Slice2);
 27691231            var header = new IceRpcRequestHeader(ref decoder);
 27691232            (IDictionary<RequestFieldKey, ReadOnlySequence<byte>> fields, PipeReader? pipeReader) =
 27691233                DecodeFieldDictionary(
 27691234                    ref decoder,
 27841235                    (ref SliceDecoder decoder) => decoder.DecodeRequestFieldKey());
 1236
 27691237            return (header, fields, pipeReader);
 27691238        }
 1239
 1240        void EncodeHeader(OutgoingResponse response)
 7351241        {
 7351242            var encoder = new SliceEncoder(streamOutput, SliceEncoding.Slice2);
 1243
 1244            // Write the IceRpc response header.
 7351245            Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(_headerSizeLength);
 1246
 1247            // We use UnflushedBytes because EncodeFieldDictionary can write directly to streamOutput.
 7351248            long headerStartPos = streamOutput.UnflushedBytes;
 1249
 7351250            encoder.EncodeStatusCode(response.StatusCode);
 7351251            if (response.StatusCode > StatusCode.Ok)
 501252            {
 501253                encoder.EncodeString(response.ErrorMessage!);
 501254            }
 1255
 7351256            EncodeFieldDictionary(
 7351257                response.Fields,
 81258                (ref SliceEncoder encoder, ResponseFieldKey key) => encoder.EncodeResponseFieldKey(key),
 7351259                ref encoder,
 7351260                streamOutput);
 1261
 1262            // We're done with the header encoding, write the header size.
 7331263            int headerSize = (int)(streamOutput.UnflushedBytes - headerStartPos);
 7331264            CheckPeerHeaderSize(headerSize);
 7311265            SliceEncoder.EncodeVarUInt62((uint)headerSize, sizePlaceholder);
 7311266        }
 27871267    }
 1268
 1269    /// <summary>Encodes the fields dictionary at the end of a request or response header.</summary>
 1270    /// <remarks>This method can write bytes directly to <paramref name="output"/> without going through
 1271    /// <paramref name="encoder"/>.</remarks>
 1272    private void EncodeFieldDictionary<TKey>(
 1273        IDictionary<TKey, OutgoingFieldValue> fields,
 1274        EncodeAction<TKey> encodeKeyAction,
 1275        ref SliceEncoder encoder,
 1276        PipeWriter output) where TKey : struct =>
 35301277        encoder.EncodeDictionary(
 35301278            fields,
 35301279            encodeKeyAction,
 35301280            (ref SliceEncoder encoder, OutgoingFieldValue value) =>
 231281                {
 231282                    if (value.WriteAction is Action<IBufferWriter<byte>> writeAction)
 121283                    {
 121284                        Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(_headerSizeLength);
 121285                        long startPos = output.UnflushedBytes;
 121286                        writeAction(output);
 101287                        SliceEncoder.EncodeVarUInt62((ulong)(output.UnflushedBytes - startPos), sizePlaceholder);
 101288                    }
 35301289                    else
 111290                    {
 111291                        encoder.EncodeSize(checked((int)value.ByteSequence.Length));
 111292                        encoder.WriteByteSequence(value.ByteSequence);
 111293                    }
 35511294                });
 1295
 1296    /// <summary>Increments the dispatch-invocation count.</summary>
 1297    /// <remarks>This method must be called with _mutex locked.</remarks>
 1298    private void IncrementDispatchInvocationCount()
 56241299    {
 56241300        if (_dispatchInvocationCount++ == 0 && _streamInputOutputCount == 0)
 8441301        {
 1302            // Cancel inactivity check.
 8441303            _inactivityTimeoutTimer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
 8441304        }
 56241305    }
 1306
 1307    /// <summary>Increments the stream input/output count.</summary>
 1308    /// <remarks>This method must be called with _mutex locked.</remarks>
 1309    private void IncrementStreamInputOutputCount(bool bidirectional)
 55821310    {
 55821311        Debug.Assert(_dispatchInvocationCount > 0);
 55821312        _streamInputOutputCount += bidirectional ? 2 : 1;
 55821313    }
 1314
 1315    private async Task ReadGoAwayAsync(CancellationToken cancellationToken)
 5671316    {
 5671317        await Task.Yield(); // exit mutex lock
 1318
 1319        // Wait for _connectTask (which spawned the task running this method) to complete. This await can't fail.
 1320        // This guarantees this method won't request a shutdown until after _connectTask completed successfully.
 5671321        await _connectTask!.ConfigureAwait(false);
 1322
 5671323        PipeReader remoteInput = _remoteControlStream!.Input!;
 1324
 1325        try
 5671326        {
 1327            // Wait to receive the GoAway frame.
 5671328            await ReceiveControlFrameHeaderAsync(IceRpcControlFrameType.GoAway, cancellationToken)
 5671329                .ConfigureAwait(false);
 1330
 1401331            ReadResult readResult = await remoteInput.ReadSegmentAsync(
 1401332                SliceEncoding.Slice2,
 1401333                MaxGoAwayFrameBodySize,
 1401334                cancellationToken).ConfigureAwait(false);
 1335
 1336            // We don't call CancelPendingRead on remoteInput
 1361337            Debug.Assert(!readResult.IsCanceled);
 1338
 1339            try
 1361340            {
 1361341                _goAwayFrame = SliceEncoding.Slice2.DecodeBuffer(
 1361342                    readResult.Buffer,
 2721343                    (ref SliceDecoder decoder) => new IceRpcGoAway(ref decoder));
 1341344            }
 1345            finally
 1361346            {
 1361347                remoteInput.AdvanceTo(readResult.Buffer.End);
 1361348            }
 1349
 1341350            RefuseNewInvocations("The connection was shut down because it received a GoAway frame from the peer.");
 1341351            _goAwayCts.Cancel();
 1341352            _ = _shutdownRequestedTcs.TrySetResult();
 1341353        }
 2591354        catch (OperationCanceledException)
 2591355        {
 1356            // The connection is disposed and we let this exception cancel the task.
 2591357            throw;
 1358        }
 1641359        catch (IceRpcException)
 1641360        {
 1361            // We let the task complete with this expected exception.
 1641362            throw;
 1363        }
 101364        catch (InvalidDataException exception)
 101365        {
 1366            // "expected" in the sense it should not trigger a Debug.Fail.
 101367            throw new IceRpcException(
 101368                IceRpcError.IceRpcError,
 101369                "The ReadGoAway task was aborted by an icerpc protocol error.",
 101370                exception);
 1371        }
 01372        catch (Exception exception)
 01373        {
 01374            Debug.Fail($"The read go away task failed with an unexpected exception: {exception}");
 01375            throw;
 1376        }
 1341377    }
 1378
 1379    private async ValueTask ReceiveControlFrameHeaderAsync(
 1380        IceRpcControlFrameType expectedFrameType,
 1381        CancellationToken cancellationToken)
 11571382    {
 11571383        ReadResult readResult = await _remoteControlStream!.Input.ReadAsync(cancellationToken).ConfigureAwait(false);
 1384
 1385        // We don't call CancelPendingRead on _remoteControlStream.Input.
 7251386        Debug.Assert(!readResult.IsCanceled);
 1387
 7251388        if (readResult.Buffer.IsEmpty)
 21389        {
 21390            throw new InvalidDataException(
 21391                "Failed to read the frame type because no more data is available from the control stream.");
 1392        }
 1393
 7231394        var frameType = (IceRpcControlFrameType)readResult.Buffer.FirstSpan[0];
 7231395        if (frameType != expectedFrameType)
 61396        {
 61397            throw new InvalidDataException($"Received frame type {frameType} but expected {expectedFrameType}.");
 1398        }
 7171399        _remoteControlStream!.Input.AdvanceTo(readResult.Buffer.GetPosition(1));
 7171400    }
 1401
 1402    private async ValueTask ReceiveSettingsFrameBody(CancellationToken cancellationToken)
 5771403    {
 1404        // We are still in the single-threaded initialization at this point.
 1405
 5771406        PipeReader input = _remoteControlStream!.Input;
 5771407        ReadResult readResult = await input.ReadSegmentAsync(
 5771408            SliceEncoding.Slice2,
 5771409            MaxSettingsFrameBodySize,
 5771410            cancellationToken).ConfigureAwait(false);
 1411
 1412        // We don't call CancelPendingRead on _remoteControlStream.Input
 5751413        Debug.Assert(!readResult.IsCanceled);
 1414
 1415        try
 5751416        {
 5751417            IceRpcSettings settings = SliceEncoding.Slice2.DecodeBuffer(
 5751418                readResult.Buffer,
 11501419                (ref SliceDecoder decoder) => new IceRpcSettings(ref decoder));
 1420
 5691421            if (settings.Value.TryGetValue(IceRpcSettingKey.MaxHeaderSize, out ulong value))
 61422            {
 1423                // a varuint62 always fits in a long
 1424                try
 61425                {
 61426                    _maxPeerHeaderSize = ConnectionOptions.IceRpcCheckMaxHeaderSize((long)value);
 41427                }
 21428                catch (ArgumentOutOfRangeException exception)
 21429                {
 21430                    throw new InvalidDataException("Received invalid maximum header size setting.", exception);
 1431                }
 41432                _headerSizeLength = SliceEncoder.GetVarUInt62EncodedSize(value);
 41433            }
 1434            // all other settings are unknown and ignored
 5671435        }
 1436        finally
 5751437        {
 5751438            input.AdvanceTo(readResult.Buffer.End);
 5751439        }
 5671440    }
 1441
 1442    private void RefuseNewInvocations(string message)
 11741443    {
 11741444        lock (_mutex)
 11741445        {
 11741446            _refuseInvocations = true;
 11741447            _invocationRefusedMessage ??= message;
 11741448        }
 11741449    }
 1450
 1451    // The inactivity check executes once in _inactivityTimeout. By then either:
 1452    // - the connection is no longer inactive (and the inactivity check is canceled or being canceled)
 1453    // - the connection is still inactive and we request shutdown
 1454    private void ScheduleInactivityCheck() =>
 13301455        _inactivityTimeoutTimer.Change(_inactivityTimeout, Timeout.InfiniteTimeSpan);
 1456
 1457    private ValueTask<FlushResult> SendControlFrameAsync(
 1458        IceRpcControlFrameType frameType,
 1459        EncodeAction encodeAction,
 1460        CancellationToken cancellationToken)
 7791461    {
 7791462        PipeWriter output = _controlStream!.Output;
 1463
 7791464        EncodeFrame(output);
 1465
 7791466        return output.FlushAsync(cancellationToken); // Flush
 1467
 1468        void EncodeFrame(IBufferWriter<byte> buffer)
 7791469        {
 7791470            var encoder = new SliceEncoder(buffer, SliceEncoding.Slice2);
 7791471            encoder.EncodeIceRpcControlFrameType(frameType);
 7791472            Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(_headerSizeLength);
 7791473            int startPos = encoder.EncodedByteCount; // does not include the size
 7791474            encodeAction.Invoke(ref encoder);
 7791475            int frameSize = encoder.EncodedByteCount - startPos;
 7791476            SliceEncoder.EncodeVarUInt62((uint)frameSize, sizePlaceholder);
 7791477        }
 7791478    }
 1479
 1480    /// <summary>Sends the payload continuation of an outgoing request in the background.</summary>
 1481    /// <remarks>We send the payload continuation on a separate thread with Task.Run: this ensures that the synchronous
 1482    /// activity that could result from reading or writing the payload continuation doesn't delay in any way the
 1483    /// caller. </remarks>
 1484    /// <param name="request">The outgoing request.</param>
 1485    /// <param name="payloadWriter">The payload writer.</param>
 1486    /// <param name="writesClosed">A task that completes when we can no longer write to payloadWriter.</param>
 1487    /// <param name="onGoAway">An action to execute with a CTS when we receive the GoAway frame from the peer.</param>
 1488    /// <param name="cancellationToken">The cancellation token of the invocation; the associated CTS is disposed when
 1489    /// the invocation completes.</param>
 1490    private void SendRequestPayloadContinuation(
 1491        OutgoingRequest request,
 1492        PipeWriter payloadWriter,
 1493        Task writesClosed,
 1494        Action<object?> onGoAway,
 1495        CancellationToken cancellationToken)
 241496    {
 241497        Debug.Assert(request.PayloadContinuation is not null);
 1498
 1499        // First "detach" the continuation.
 241500        PipeReader payloadContinuation = request.PayloadContinuation;
 241501        request.PayloadContinuation = null;
 1502
 241503        lock (_mutex)
 241504        {
 241505            Debug.Assert(_dispatchInvocationCount > 0); // as a result, can't be disposed.
 1506
 1507            // Give the task its own dispatch-invocation count. This ensures the transport connection won't be disposed
 1508            // while the continuation is being sent.
 241509            IncrementDispatchInvocationCount();
 241510        }
 1511
 1512        // This background task owns payloadContinuation, payloadWriter and 1 dispatch-invocation count, and must clean
 1513        // them up. Hence CancellationToken.None.
 241514        _ = Task.Run(PerformSendRequestPayloadContinuationAsync, CancellationToken.None);
 1515
 1516        async Task PerformSendRequestPayloadContinuationAsync()
 241517        {
 241518            bool success = false;
 1519
 1520            try
 241521            {
 1522                // Since _dispatchInvocationCount > 0, _disposedCts is not disposed.
 241523                using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _disposedCts.Token);
 1524
 1525                // This token registration is needed for one-way requests and is redundant for two-way requests.
 1526                // We want GoAway to cancel the sending of one-way requests that have not been received by the peer,
 1527                // especially when these requests have payload continuations.
 241528                using CancellationTokenRegistration tokenRegistration = _goAwayCts.Token.UnsafeRegister(onGoAway, cts);
 1529
 1530                try
 241531                {
 1532                    // The cancellation of the InvokeAsync's cancellationToken cancels cts only until InvokeAsync's
 1533                    // PerformInvokeAsync completes. Afterwards, the cancellation of InvokeAsync's cancellationToken has
 1534                    // no effect on cts, so it doesn't cancel the copying of payloadContinuation.
 241535                    FlushResult flushResult = await payloadWriter.CopyFromAsync(
 241536                        payloadContinuation,
 241537                        writesClosed,
 241538                        endStream: true,
 241539                        cts.Token).ConfigureAwait(false);
 1540
 101541                    success = !flushResult.IsCanceled;
 101542                }
 61543                catch (OperationCanceledException exception) when (exception.CancellationToken == cts.Token)
 41544                {
 1545                    // Process/translate this exception primarily for the benefit of _taskExceptionObserver.
 1546
 1547                    // Can be because cancellationToken was canceled by DisposeAsync or GoAway; that's fine.
 41548                    cancellationToken.ThrowIfCancellationRequested();
 1549
 21550                    if (_disposedCts.IsCancellationRequested)
 01551                    {
 1552                        // DisposeAsync aborted the request.
 01553                        throw new IceRpcException(IceRpcError.OperationAborted);
 1554                    }
 1555                    else
 21556                    {
 1557                        // When _goAwayCts is canceled and onGoAway cancels its argument:
 1558                        // - if PerformInvokeAsync is no longer running (typical for a one-way request), we get here
 1559                        // - if PerformInvokeAsync is still running, we may get here or cancellationToken gets canceled
 1560                        // first.
 21561                        Debug.Assert(_goAwayCts.IsCancellationRequested);
 21562                        throw new IceRpcException(IceRpcError.InvocationCanceled, "The connection is shutting down.");
 1563                    }
 1564                }
 101565            }
 141566            catch (Exception exception) when (_taskExceptionObserver is not null)
 101567            {
 101568                _taskExceptionObserver.RequestPayloadContinuationFailed(
 101569                    request,
 101570                    _connectionContext!.TransportConnectionInformation,
 101571                    exception);
 101572            }
 21573            catch (OperationCanceledException exception) when (exception.CancellationToken == cancellationToken)
 21574            {
 1575                // Expected.
 21576            }
 21577            catch (IceRpcException)
 21578            {
 1579                // Expected, with for example IceRpcError.ConnectionAborted when the peer aborts the connection.
 21580            }
 01581            catch (Exception exception)
 01582            {
 1583                // This exception is unexpected when running the IceRPC test suite. A test that expects such an
 1584                // exception must install a task exception observer.
 01585                Debug.Fail($"Failed to send payload continuation of request {request}: {exception}");
 1586
 1587                // If Debug is not enabled and there is no task exception observer, we rethrow to generate an
 1588                // Unobserved Task Exception.
 01589                throw;
 1590            }
 1591            finally
 241592            {
 241593                payloadWriter.CompleteOutput(success);
 241594                payloadContinuation.Complete();
 241595                DecrementDispatchInvocationCount();
 241596            }
 241597        }
 241598    }
 1599}

Methods/Properties

get_IsServer()
.ctor(IceRpc.Transports.IMultiplexedConnection,IceRpc.Transports.TransportConnectionInformation,IceRpc.ConnectionOptions,IceRpc.Internal.ITaskExceptionObserver)
ConnectAsync(System.Threading.CancellationToken)
PerformConnectAsync()
DisposeAsync()
PerformDisposeAsync()
InvokeAsync(IceRpc.OutgoingRequest,System.Threading.CancellationToken)
PerformInvokeAsync()
OnGoAway()
DecodeHeader()
EncodeHeader()
ShutdownAsync(System.Threading.CancellationToken)
PerformShutdownAsync()
DecodeFieldDictionary(ZeroC.Slice.SliceDecoder&,ZeroC.Slice.DecodeFunc`1<TKey>)
AcceptRequestsAsync()
CheckPeerHeaderSize(System.Int32)
DecrementDispatchInvocationCount()
DecrementStreamInputOutputCount()
DispatchRequestAsync()
PerformDispatchRequestAsync()
DecodeHeader()
EncodeHeader()
EncodeFieldDictionary(System.Collections.Generic.IDictionary`2<TKey,IceRpc.OutgoingFieldValue>,ZeroC.Slice.EncodeAction`1<TKey>,ZeroC.Slice.SliceEncoder&,System.IO.Pipelines.PipeWriter)
IncrementDispatchInvocationCount()
IncrementStreamInputOutputCount(System.Boolean)
ReadGoAwayAsync()
ReceiveControlFrameHeaderAsync()
ReceiveSettingsFrameBody()
RefuseNewInvocations(System.String)
ScheduleInactivityCheck()
SendControlFrameAsync(IceRpc.Internal.IceRpcControlFrameType,ZeroC.Slice.EncodeAction,System.Threading.CancellationToken)
EncodeFrame()
SendRequestPayloadContinuation(IceRpc.OutgoingRequest,System.IO.Pipelines.PipeWriter,System.Threading.Tasks.Task,System.Action`1<System.Object>,System.Threading.CancellationToken)
PerformSendRequestPayloadContinuationAsync()