< Summary

Information
Class: IceRpc.Internal.IceRpcProtocolConnection
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Internal/IceRpcProtocolConnection.cs
Tag: 1856_27024993493
Line coverage
91%
Covered lines: 868
Uncovered lines: 76
Coverable lines: 944
Total lines: 1613
Line coverage: 91.9%
Branch coverage
88%
Covered branches: 199
Total branches: 226
Branch coverage: 88%
Method coverage
100%
Covered methods: 34
Fully covered methods: 20
Total methods: 34
Method coverage: 100%
Full method coverage: 58.8%

Metrics

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Internal/IceRpcProtocolConnection.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Transports;
 4using System.Buffers;
 5using System.Collections.Immutable;
 6using System.Diagnostics;
 7using System.IO.Pipelines;
 8using System.Security.Authentication;
 9using ZeroC.Slice.Codec;
 10
 11namespace IceRpc.Internal;
 12
 13internal sealed class IceRpcProtocolConnection : IProtocolConnection
 14{
 15    private const int MaxGoAwayFrameBodySize = 16;
 16    private const int MaxSettingsFrameBodySize = 1024;
 17
 150318    private bool IsServer => _transportConnectionInformation is not null;
 19
 20    private Task? _acceptRequestsTask;
 21
 22    private Task? _connectTask;
 23    private IConnectionContext? _connectionContext; // non-null once the connection is established
 24    private IMultiplexedStream? _controlStream;
 25
 26    // The number of outstanding dispatches and invocations.
 27    // DisposeAsync waits until this count reaches 0 (using _dispatchesAndInvocationsCompleted) before disposing the
 28    // underlying transport connection. So when this count is greater than 0, we know _transportConnection and other
 29    // fields are not disposed.
 30    // _dispatchInvocationCount is also used for the inactivity check: the connection remains active while
 31    // _dispatchInvocationCount > 0 or _streamInputOutputCount > 0.
 32    private int _dispatchInvocationCount;
 33
 34    private readonly SemaphoreSlim? _dispatchSemaphore;
 35
 36    private readonly IDispatcher? _dispatcher;
 40337    private readonly TaskCompletionSource _dispatchesAndInvocationsCompleted =
 40338        new(TaskCreationOptions.RunContinuationsAsynchronously);
 39
 40    private Task? _disposeTask;
 41
 42    // This cancellation token source is canceled when the connection is disposed.
 40343    private readonly CancellationTokenSource _disposedCts = new();
 44
 45    // Canceled when we receive the GoAway frame from the peer.
 40346    private readonly CancellationTokenSource _goAwayCts = new();
 47
 48    // The GoAway frame received from the peer. Read it only after _goAwayCts is canceled.
 49    private IceRpcGoAway _goAwayFrame;
 50
 51    // The number of bytes we need to encode a size up to _maxPeerHeaderSize. It's 2 for DefaultMaxIceRpcHeaderSize.
 40352    private int _headerSizeLength = 2;
 53
 54    private readonly TimeSpan _inactivityTimeout;
 55    private readonly Timer _inactivityTimeoutTimer;
 56    private string? _invocationRefusedMessage;
 57
 58    // The ID of the last bidirectional stream accepted by this connection. It's null as long as no bidirectional stream
 59    // was accepted.
 60    private ulong? _lastRemoteBidirectionalStreamId;
 61
 62    // The ID of the last unidirectional stream accepted by this connection. It's null as long as no unidirectional
 63    // stream (other than _remoteControlStream) was accepted.
 64    private ulong? _lastRemoteUnidirectionalStreamId;
 65
 66    private readonly int _maxLocalHeaderSize;
 40367    private int _maxPeerHeaderSize = ConnectionOptions.DefaultMaxIceRpcHeaderSize;
 68
 40369    private readonly Lock _mutex = new();
 70
 71    private Task? _readGoAwayTask;
 72
 73    // A connection refuses invocations when it's disposed, shut down, shutting down or merely "shutdown requested".
 74    private bool _refuseInvocations;
 75
 76    private IMultiplexedStream? _remoteControlStream;
 77
 78    private readonly CancellationTokenSource _shutdownOrGoAwayCts;
 79
 80    // The thread that completes this TCS can run the continuations, and as a result its result must be set without
 81    // holding a lock on _mutex.
 40382    private readonly TaskCompletionSource _shutdownRequestedTcs = new();
 83
 84    private Task? _shutdownTask;
 85
 86    // Keeps track of the number of stream Input and Output that are not completed yet.
 87    // It's not the same as the _dispatchInvocationCount: a dispatch or invocation can be completed while the
 88    // application is still reading an incoming frame payload that corresponds to a stream input.
 89    // ShutdownAsync waits for both _streamInputOutputCount and _dispatchInvocationCount to reach 0, while DisposeAsync
 90    // only waits for _dispatchInvocationCount to reach 0.
 91    private int _streamInputOutputCount;
 92
 93    // The streams are completed when _shutdownTask is not null and _streamInputOutputCount is 0.
 40394    private readonly TaskCompletionSource _streamsCompleted = new(TaskCreationOptions.RunContinuationsAsynchronously);
 95
 96    private readonly ITaskExceptionObserver? _taskExceptionObserver;
 97
 98    private readonly IMultiplexedConnection _transportConnection;
 99
 100    // Only set for server connections.
 101    private readonly TransportConnectionInformation? _transportConnectionInformation;
 102
 103    public Task<(TransportConnectionInformation ConnectionInformation, Task ShutdownRequested)> ConnectAsync(
 104        CancellationToken cancellationToken)
 396105    {
 106        Task<(TransportConnectionInformation ConnectionInformation, Task ShutdownRequested)> result;
 107
 108        lock (_mutex)
 396109        {
 396110            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 111
 394112            if (_connectTask is not null)
 0113            {
 0114                throw new InvalidOperationException("Cannot call connect more than once.");
 115            }
 116
 394117            result = PerformConnectAsync();
 394118            _connectTask = result;
 394119        }
 394120        return result;
 121
 122        async Task<(TransportConnectionInformation ConnectionInformation, Task ShutdownRequested)> PerformConnectAsync()
 394123        {
 124            // Make sure we execute the function without holding the connection mutex lock.
 394125            await Task.Yield();
 126
 127            // _disposedCts is not disposed at this point because DisposeAsync waits for the completion of _connectTask.
 394128            using var connectCts = CancellationTokenSource.CreateLinkedTokenSource(
 394129                cancellationToken,
 394130                _disposedCts.Token);
 131
 132            TransportConnectionInformation transportConnectionInformation;
 133
 134            try
 394135            {
 136                // If the transport connection information is null, we need to connect the transport connection. It's
 137                // null for client connections. The transport connection of a server connection is established by
 138                // Server.
 394139                transportConnectionInformation = _transportConnectionInformation ??
 394140                    await _transportConnection.ConnectAsync(connectCts.Token).ConfigureAwait(false);
 141
 369142                _controlStream = await _transportConnection.CreateStreamAsync(
 369143                    false,
 369144                    connectCts.Token).ConfigureAwait(false);
 145
 364146                var settings = new IceRpcSettings(
 364147                    _maxLocalHeaderSize == ConnectionOptions.DefaultMaxIceRpcHeaderSize ?
 364148                        ImmutableDictionary<IceRpcSettingKey, ulong>.Empty :
 364149                        new Dictionary<IceRpcSettingKey, ulong>
 364150                        {
 364151                            [IceRpcSettingKey.MaxHeaderSize] = (ulong)_maxLocalHeaderSize
 364152                        });
 153
 154                try
 364155                {
 364156                    await SendControlFrameAsync(
 364157                        IceRpcControlFrameType.Settings,
 364158                        settings.Encode,
 364159                        connectCts.Token).ConfigureAwait(false);
 360160                }
 4161                catch
 4162                {
 163                    // If we fail to send the Settings frame, we are in an abortive closure and we close Output to allow
 164                    // the peer to continue if it's waiting for us. This could happen when the cancellation token is
 165                    // canceled.
 4166                    _controlStream!.Output.CompleteOutput(success: false);
 4167                    throw;
 168                }
 169
 170                // Wait for the remote control stream to be accepted and read the protocol Settings frame
 360171                _remoteControlStream = await _transportConnection.AcceptStreamAsync(
 360172                    connectCts.Token).ConfigureAwait(false);
 173
 344174                await ReceiveControlFrameHeaderAsync(
 344175                    IceRpcControlFrameType.Settings,
 344176                    connectCts.Token).ConfigureAwait(false);
 177
 336178                await ReceiveSettingsFrameBody(connectCts.Token).ConfigureAwait(false);
 331179            }
 28180            catch (OperationCanceledException)
 28181            {
 28182                cancellationToken.ThrowIfCancellationRequested();
 183
 9184                Debug.Assert(_disposedCts.Token.IsCancellationRequested);
 9185                throw new IceRpcException(
 9186                    IceRpcError.OperationAborted,
 9187                    "The connection establishment was aborted because the connection was disposed.");
 188            }
 7189            catch (InvalidDataException exception)
 7190            {
 7191                throw new IceRpcException(
 7192                    IceRpcError.ConnectionAborted,
 7193                    "The connection establishment was aborted by an icerpc protocol error.",
 7194                    exception);
 195            }
 1196            catch (AuthenticationException)
 1197            {
 1198                throw;
 199            }
 27200            catch (IceRpcException)
 27201            {
 27202                throw;
 203            }
 0204            catch (Exception exception)
 0205            {
 0206                Debug.Fail($"ConnectAsync failed with an unexpected exception: {exception}");
 0207                throw;
 208            }
 209
 210            // This needs to be set before starting the accept requests task below.
 331211            _connectionContext = new ConnectionContext(this, transportConnectionInformation);
 212
 213            // We assign _readGoAwayTask and _acceptRequestsTask with _mutex locked to make sure this assignment
 214            // occurs before the start of DisposeAsync. Once _disposeTask is not null, _readGoAwayTask etc are
 215            // immutable.
 216            lock (_mutex)
 331217            {
 331218                if (_disposeTask is not null)
 0219                {
 0220                    throw new IceRpcException(
 0221                        IceRpcError.OperationAborted,
 0222                        "The connection establishment was aborted because the connection was disposed.");
 223                }
 224
 225                // Read the go away frame from the control stream.
 331226                _readGoAwayTask = ReadGoAwayAsync(_disposedCts.Token);
 227
 228                // Start a task that accepts requests (the "accept requests loop")
 331229                _acceptRequestsTask = AcceptRequestsAsync(_shutdownOrGoAwayCts.Token);
 331230            }
 231
 232            // The _acceptRequestsTask waits for this PerformConnectAsync completion before reading anything. As soon as
 233            // it receives a request, it will cancel this inactivity check.
 331234            ScheduleInactivityCheck();
 235
 331236            return (transportConnectionInformation, _shutdownRequestedTcs.Task);
 331237        }
 394238    }
 239
 240    public ValueTask DisposeAsync()
 429241    {
 242        lock (_mutex)
 429243        {
 429244            if (_disposeTask is null)
 403245            {
 403246                RefuseNewInvocations("The connection was disposed.");
 247
 403248                if (_streamInputOutputCount == 0)
 390249                {
 250                    // That's only for consistency. _streamsCompleted.Task matters only to ShutdownAsync.
 390251                    _streamsCompleted.TrySetResult();
 390252                }
 403253                if (_dispatchInvocationCount == 0)
 394254                {
 394255                    _dispatchesAndInvocationsCompleted.TrySetResult();
 394256                }
 257
 403258                _shutdownTask ??= Task.CompletedTask;
 403259                _disposeTask = PerformDisposeAsync();
 403260            }
 429261        }
 429262        return new(_disposeTask);
 263
 264        async Task PerformDisposeAsync()
 403265        {
 266            // Make sure we execute the code below without holding the mutex lock.
 403267            await Task.Yield();
 268
 403269            _disposedCts.Cancel();
 270
 271            // We don't lock _mutex since once _disposeTask is not null, _connectTask etc are immutable.
 272
 403273            if (_connectTask is not null)
 394274            {
 275                // We wait for _dispatchesAndInvocationsCompleted (since dispatches and invocations are somewhat under
 276                // our control), but not for _streamsCompleted, since we can't make the application complete the
 277                // incoming payload pipe readers.
 278                try
 394279                {
 394280                    await Task.WhenAll(
 394281                        _connectTask,
 394282                        _acceptRequestsTask ?? Task.CompletedTask,
 394283                        _readGoAwayTask ?? Task.CompletedTask,
 394284                        _shutdownTask,
 394285                        _dispatchesAndInvocationsCompleted.Task).ConfigureAwait(false);
 79286                }
 315287                catch
 315288                {
 289                    // Expected if any of these tasks failed or was canceled. Each task takes care of handling
 290                    // unexpected exceptions so there's no need to handle them here.
 315291                }
 394292            }
 293
 294            // If the application is still reading some incoming payload, the disposal of the transport connection can
 295            // abort this reading.
 403296            await _transportConnection.DisposeAsync().ConfigureAwait(false);
 297
 298            // It's safe to complete the output since write operations have been completed by the transport connection
 299            // disposal.
 403300            _controlStream?.Output.Complete();
 301
 302            // It's safe to complete the input since read operations have been completed by the transport connection
 303            // disposal.
 403304            _remoteControlStream?.Input.Complete();
 305
 403306            _dispatchSemaphore?.Dispose();
 403307            _disposedCts.Dispose();
 403308            _goAwayCts.Dispose();
 403309            _shutdownOrGoAwayCts.Dispose();
 310
 403311            await _inactivityTimeoutTimer.DisposeAsync().ConfigureAwait(false);
 403312        }
 429313    }
 314
 315    public Task<IncomingResponse> InvokeAsync(OutgoingRequest request, CancellationToken cancellationToken = default)
 1443316    {
 1443317        if (request.Protocol != Protocol.IceRpc)
 1318        {
 1319            throw new InvalidOperationException(
 1320                $"Cannot send {request.Protocol} request on {Protocol.IceRpc} connection.");
 321        }
 322
 323        lock (_mutex)
 1442324        {
 1442325            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 326
 1441327            if (_refuseInvocations)
 1328            {
 1329                throw new IceRpcException(IceRpcError.InvocationRefused, _invocationRefusedMessage);
 330            }
 1440331            if (_connectTask is null)
 0332            {
 0333                throw new InvalidOperationException("Cannot invoke on a connection before connecting it.");
 334            }
 1440335            if (!IsServer && !_connectTask.IsCompletedSuccessfully)
 0336            {
 0337                throw new InvalidOperationException(
 0338                    "Cannot invoke on a client connection that is not fully established.");
 339            }
 340            // It's possible but rare to invoke on a server connection that is still connecting.
 341
 1440342            if (request.ServiceAddress.Fragment.Length > 0)
 0343            {
 0344                throw new NotSupportedException("The icerpc protocol does not support fragments.");
 345            }
 346
 1440347            IncrementDispatchInvocationCount();
 1440348        }
 349
 1440350        return PerformInvokeAsync();
 351
 352        async Task<IncomingResponse> PerformInvokeAsync()
 1440353        {
 354            // Since _dispatchInvocationCount > 0, _disposedCts is not disposed.
 1440355            using var invocationCts = CancellationTokenSource.CreateLinkedTokenSource(
 1440356                cancellationToken,
 1440357                _disposedCts.Token);
 358
 1440359            PipeReader? streamInput = null;
 360
 361            // This try/catch block cleans up streamInput (when not null) and decrements the dispatch-invocation count.
 362            try
 1440363            {
 364                // Create the stream.
 365                IMultiplexedStream stream;
 366                try
 1440367                {
 368                    // We want to cancel CreateStreamAsync as soon as the connection is being shutdown or received a
 369                    // GoAway frame.
 1440370                    using CancellationTokenRegistration _ = _shutdownOrGoAwayCts.Token.UnsafeRegister(
 6371                        cts => ((CancellationTokenSource)cts!).Cancel(),
 1440372                        invocationCts);
 373
 1440374                    stream = await _transportConnection.CreateStreamAsync(
 1440375                        bidirectional: !request.IsOneway,
 1440376                        invocationCts.Token).ConfigureAwait(false);
 377
 1431378                    streamInput = stream.IsBidirectional ? stream.Input : null;
 1431379                }
 8380                catch (OperationCanceledException)
 8381                {
 8382                    cancellationToken.ThrowIfCancellationRequested();
 383
 384                    // Connection was shutdown or disposed and we did not read the payload at all.
 6385                    throw new IceRpcException(IceRpcError.InvocationRefused, _invocationRefusedMessage);
 386                }
 1387                catch (IceRpcException exception)
 1388                {
 1389                    RefuseNewInvocations("The connection was lost.");
 1390                    throw new IceRpcException(IceRpcError.InvocationRefused, _invocationRefusedMessage, exception);
 391                }
 0392                catch (Exception exception)
 0393                {
 0394                    Debug.Fail($"CreateStreamAsync failed with an unexpected exception: {exception}");
 0395                    RefuseNewInvocations("The connection was lost.");
 0396                    throw new IceRpcException(IceRpcError.InvocationRefused, _invocationRefusedMessage, exception);
 397                }
 398
 1431399                using CancellationTokenRegistration tokenRegistration = _goAwayCts.Token.UnsafeRegister(
 1431400                    OnGoAway,
 1431401                    invocationCts);
 402
 403                PipeWriter payloadWriter;
 404
 405                try
 1431406                {
 407                    lock (_mutex)
 1431408                    {
 1431409                        if (_refuseInvocations)
 0410                        {
 411                            // Both stream.Output and stream.Output are completed by catch blocks below.
 0412                            throw new IceRpcException(IceRpcError.InvocationRefused, _invocationRefusedMessage);
 413                        }
 414
 1431415                        IncrementStreamInputOutputCount(stream.IsBidirectional);
 416
 417                        // Decorate the stream to decrement the input/output count on Complete.
 1431418                        stream = new MultiplexedStreamDecorator(stream, DecrementStreamInputOutputCount);
 1431419                        streamInput = stream.IsBidirectional ? stream.Input : null;
 1431420                    }
 421
 1431422                    EncodeHeader(stream.Output);
 1430423                    payloadWriter = request.GetPayloadWriter(stream.Output);
 1430424                }
 1425                catch
 1426                {
 1427                    stream.Output.CompleteOutput(success: false);
 1428                    throw;
 429                }
 430
 431                // From now on, we only use payloadWriter to write and we make sure to complete it.
 432
 1430433                bool hasContinuation = request.PayloadContinuation is not null;
 434                FlushResult flushResult;
 435
 436                try
 1430437                {
 1430438                    flushResult = await payloadWriter.CopyFromAsync(
 1430439                        request.Payload,
 1430440                        stream.WritesClosed,
 1430441                        endStream: !hasContinuation,
 1430442                        invocationCts.Token).ConfigureAwait(false);
 1422443                }
 8444                catch
 8445                {
 8446                    payloadWriter.CompleteOutput(success: false);
 8447                    request.PayloadContinuation?.Complete();
 8448                    throw;
 449                }
 450                finally
 1430451                {
 1430452                    request.Payload.Complete();
 1430453                }
 454
 1422455                if (flushResult.IsCompleted || flushResult.IsCanceled || !hasContinuation)
 1410456                {
 457                    // The remote reader doesn't want more data, or the copying was canceled, or there is no
 458                    // continuation: we're done.
 1410459                    payloadWriter.CompleteOutput(!flushResult.IsCanceled);
 1410460                    request.PayloadContinuation?.Complete();
 1410461                }
 462                else
 12463                {
 464                    // Sends the payload continuation in a background thread.
 12465                    SendRequestPayloadContinuation(
 12466                        request,
 12467                        payloadWriter,
 12468                        stream.WritesClosed,
 12469                        OnGoAway,
 12470                        invocationCts.Token);
 12471                }
 472
 1422473                if (request.IsOneway)
 1010474                {
 1010475                    return new IncomingResponse(request, _connectionContext!);
 476                }
 477
 412478                Debug.Assert(streamInput is not null);
 479
 480                try
 412481                {
 412482                    ReadResult readResult = await streamInput.ReadSliceSegmentAsync(
 412483                        _maxLocalHeaderSize,
 412484                        invocationCts.Token).ConfigureAwait(false);
 485
 486                    // Nothing cancels the stream input pipe reader.
 390487                    Debug.Assert(!readResult.IsCanceled);
 488
 390489                    if (readResult.Buffer.IsEmpty)
 0490                    {
 0491                        throw new IceRpcException(
 0492                            IceRpcError.IceRpcError,
 0493                            "Received an icerpc response with an empty header.");
 494                    }
 495
 390496                    (StatusCode statusCode, string? errorMessage, IDictionary<ResponseFieldKey, ReadOnlySequence<byte>> 
 390497                        DecodeHeader(readResult.Buffer);
 389498                    stream.Input.AdvanceTo(readResult.Buffer.End);
 499
 389500                    if (statusCode == StatusCode.TruncatedPayload && invocationCts.Token.IsCancellationRequested)
 0501                    {
 502                        // Canceling the sending of the payload continuation triggers the completion of the stream
 503                        // output. This may lead to a TruncatedPayload if the dispatch is currently reading the payload
 504                        // continuation. In such cases, we prioritize throwing an OperationCanceledException.
 0505                        fieldsPipeReader?.Complete();
 0506                        invocationCts.Token.ThrowIfCancellationRequested();
 0507                    }
 508
 389509                    var response = new IncomingResponse(
 389510                        request,
 389511                        _connectionContext!,
 389512                        statusCode,
 389513                        errorMessage,
 389514                        fields,
 389515                        fieldsPipeReader)
 389516                    {
 389517                        Payload = streamInput
 389518                    };
 519
 389520                    streamInput = null; // response now owns the stream input
 389521                    return response;
 522                }
 2523                catch (InvalidDataException exception)
 2524                {
 2525                    throw new IceRpcException(
 2526                        IceRpcError.IceRpcError,
 2527                        "Received an icerpc response with an invalid header.",
 2528                        exception);
 529                }
 530
 531                void OnGoAway(object? cts)
 12532                {
 12533                    if (!stream.IsStarted ||
 12534                        stream.Id >=
 12535                            (stream.IsBidirectional ?
 12536                                _goAwayFrame.BidirectionalStreamId :
 12537                                _goAwayFrame.UnidirectionalStreamId))
 4538                    {
 539                        // The request wasn't received by the peer so it's safe to cancel the invocation.
 4540                        ((CancellationTokenSource)cts!).Cancel();
 4541                    }
 12542                }
 543            }
 17544            catch (OperationCanceledException exception) when (exception.CancellationToken == invocationCts.Token)
 14545            {
 14546                cancellationToken.ThrowIfCancellationRequested();
 547
 7548                if (_disposedCts.IsCancellationRequested)
 4549                {
 550                    // DisposeAsync aborted the request.
 4551                    throw new IceRpcException(IceRpcError.OperationAborted);
 552                }
 553                else
 3554                {
 3555                    Debug.Assert(_goAwayCts.IsCancellationRequested);
 3556                    throw new IceRpcException(IceRpcError.InvocationCanceled, "The connection is shutting down.");
 557                }
 558            }
 559            finally
 1440560            {
 1440561                streamInput?.Complete();
 1440562                DecrementDispatchInvocationCount();
 1440563            }
 564
 565            static (StatusCode StatusCode, string? ErrorMessage, IDictionary<ResponseFieldKey, ReadOnlySequence<byte>>, 
 566                ReadOnlySequence<byte> buffer)
 390567            {
 390568                var decoder = new SliceDecoder(buffer);
 569
 390570                StatusCode statusCode = decoder.DecodeStatusCode();
 390571                string? errorMessage = statusCode == StatusCode.Ok ? null : decoder.DecodeString();
 572
 390573                (IDictionary<ResponseFieldKey, ReadOnlySequence<byte>> fields, PipeReader? pipeReader) =
 390574                    DecodeFieldDictionary(
 390575                        ref decoder,
 393576                        (ref SliceDecoder decoder) => decoder.DecodeResponseFieldKey());
 577
 389578                return (statusCode, errorMessage, fields, pipeReader);
 389579            }
 580
 581            void EncodeHeader(PipeWriter streamOutput)
 1431582            {
 1431583                var encoder = new SliceEncoder(streamOutput);
 584
 585                // Write the IceRpc request header.
 1431586                Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(_headerSizeLength);
 587
 588                // We use UnflushedBytes because EncodeFieldDictionary can write directly to streamOutput.
 1431589                long headerStartPos = streamOutput.UnflushedBytes;
 590
 1431591                var header = new IceRpcRequestHeader(request.ServiceAddress.Path, request.Operation);
 592
 1431593                header.Encode(ref encoder);
 594
 1431595                EncodeFieldDictionary(
 1431596                    request.Fields,
 14597                    (ref SliceEncoder encoder, RequestFieldKey key) => encoder.EncodeRequestFieldKey(key),
 1431598                    ref encoder,
 1431599                    streamOutput);
 600
 601                // We're done with the header encoding, write the header size.
 1431602                int headerSize = (int)(streamOutput.UnflushedBytes - headerStartPos);
 1431603                CheckPeerHeaderSize(headerSize);
 1430604                SliceEncoder.EncodeVarUInt62((uint)headerSize, sizePlaceholder);
 1430605            }
 1399606        }
 1440607    }
 608
 609    public Task ShutdownAsync(CancellationToken cancellationToken = default)
 109610    {
 611        lock (_mutex)
 109612        {
 109613            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 614
 106615            if (_shutdownTask is not null)
 2616            {
 2617                throw new InvalidOperationException("Cannot call ShutdownAsync more than once.");
 618            }
 104619            if (_connectTask is null || !_connectTask.IsCompletedSuccessfully)
 3620            {
 3621                throw new InvalidOperationException("Cannot shut down a protocol connection before it's connected.");
 622            }
 623
 101624            RefuseNewInvocations("The connection was shut down.");
 625
 101626            if (_streamInputOutputCount == 0)
 79627            {
 79628                _streamsCompleted.TrySetResult();
 79629            }
 101630            if (_dispatchInvocationCount == 0)
 78631            {
 78632                _dispatchesAndInvocationsCompleted.TrySetResult();
 78633            }
 634
 101635            _shutdownTask = PerformShutdownAsync();
 101636        }
 101637        return _shutdownTask;
 638
 639        async Task PerformShutdownAsync()
 101640        {
 101641            await Task.Yield(); // exit mutex lock
 642
 101643            _shutdownOrGoAwayCts.Cancel();
 644
 645            try
 101646            {
 101647                Debug.Assert(_acceptRequestsTask is not null);
 101648                Debug.Assert(_controlStream is not null);
 101649                Debug.Assert(_readGoAwayTask is not null);
 101650                Debug.Assert(_remoteControlStream is not null);
 651
 101652                await _acceptRequestsTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 653
 89654                using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _disposedCts.Token);
 655
 656                // Once shutdownTask is not null, _lastRemoteBidirectionalStreamId and _lastRemoteUnidirectionalStreamId
 657                // are immutable.
 658
 659                // When this peer is the server endpoint, the first accepted bidirectional stream ID is 0. When this
 660                // peer is the client endpoint, the first accepted bidirectional stream ID is 1.
 89661                IceRpcGoAway goAwayFrame = new(
 89662                    _lastRemoteBidirectionalStreamId is ulong value ? value + 4 : (IsServer ? 0ul : 1ul),
 89663                    (_lastRemoteUnidirectionalStreamId ?? _remoteControlStream.Id) + 4);
 664
 665                try
 89666                {
 89667                    _ = await SendControlFrameAsync(
 89668                        IceRpcControlFrameType.GoAway,
 89669                        goAwayFrame.Encode,
 89670                        cts.Token).ConfigureAwait(false);
 671
 672                    // Wait for the peer to send back a GoAway frame. The task should already be completed if the
 673                    // shutdown was initiated by the peer.
 87674                    await _readGoAwayTask.WaitAsync(cts.Token).ConfigureAwait(false);
 675
 676                    // Wait for all streams (other than the control streams) to have their Input and Output completed.
 77677                    await _streamsCompleted.Task.WaitAsync(cts.Token).ConfigureAwait(false);
 678
 679                    // Close the control stream to notify the peer that on our side, all the streams completed and that
 680                    // it can close the transport connection whenever it likes.
 76681                    _controlStream.Output.CompleteOutput(success: true);
 76682                }
 13683                catch
 13684                {
 685                    // If we fail to send the GoAway frame or some other failure occur (such as
 686                    // OperationCanceledException) we are in an abortive closure and we close Output to allow
 687                    // the peer to continue if it's waiting for us.
 13688                    _controlStream.Output.CompleteOutput(success: false);
 13689                    throw;
 690                }
 691
 692                // Wait for the peer notification that on its side all the streams are completed. It's important to wait
 693                // for this notification before closing the connection. In particular with QUIC where closing the
 694                // connection before all the streams are processed could lead to a stream failure.
 695                try
 76696                {
 697                    // Wait for the _remoteControlStream Input completion.
 76698                    ReadResult readResult = await _remoteControlStream.Input.ReadAsync(cts.Token).ConfigureAwait(false);
 699
 73700                    Debug.Assert(!readResult.IsCanceled);
 701
 73702                    if (!readResult.IsCompleted || !readResult.Buffer.IsEmpty)
 0703                    {
 0704                        throw new IceRpcException(
 0705                            IceRpcError.IceRpcError,
 0706                            "Received bytes on the control stream after receiving the GoAway frame.");
 707                    }
 708
 709                    // We can now safely close the connection.
 73710                    await _transportConnection.CloseAsync(MultiplexedConnectionCloseError.NoError, cts.Token)
 73711                        .ConfigureAwait(false);
 71712                }
 3713                catch (IceRpcException exception) when (exception.IceRpcError == IceRpcError.ConnectionClosedByPeer)
 1714                {
 715                    // Expected if the peer closed the connection first.
 1716                }
 717
 718                // We wait for the completion of the dispatches that we created (and, secondarily, invocations).
 72719                await _dispatchesAndInvocationsCompleted.Task.WaitAsync(cts.Token).ConfigureAwait(false);
 72720            }
 10721            catch (OperationCanceledException)
 10722            {
 10723                cancellationToken.ThrowIfCancellationRequested();
 724
 4725                Debug.Assert(_disposedCts.Token.IsCancellationRequested);
 4726                throw new IceRpcException(
 4727                    IceRpcError.OperationAborted,
 4728                    "The connection shutdown was aborted because the connection was disposed.");
 729            }
 0730            catch (InvalidDataException exception)
 0731            {
 0732                throw new IceRpcException(
 0733                    IceRpcError.IceRpcError,
 0734                    "The connection shutdown was aborted by an icerpc protocol error.",
 0735                    exception);
 736            }
 19737            catch (IceRpcException)
 19738            {
 19739                throw;
 740            }
 0741            catch (Exception exception)
 0742            {
 0743                Debug.Fail($"ShutdownAsync failed with an unexpected exception: {exception}");
 0744                throw;
 745            }
 72746        }
 101747    }
 748
 403749    internal IceRpcProtocolConnection(
 403750        IMultiplexedConnection transportConnection,
 403751        TransportConnectionInformation? transportConnectionInformation,
 403752        ConnectionOptions options,
 403753        ITaskExceptionObserver? taskExceptionObserver)
 403754    {
 403755        _shutdownOrGoAwayCts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token, _goAwayCts.Token);
 756
 403757        _taskExceptionObserver = taskExceptionObserver;
 758
 403759        _transportConnection = transportConnection;
 403760        _dispatcher = options.Dispatcher;
 403761        _maxLocalHeaderSize = options.MaxIceRpcHeaderSize;
 403762        _transportConnectionInformation = transportConnectionInformation;
 763
 403764        if (options.MaxDispatches > 0)
 403765        {
 403766            _dispatchSemaphore = new SemaphoreSlim(
 403767                initialCount: options.MaxDispatches,
 403768                maxCount: options.MaxDispatches);
 403769        }
 770
 403771        _inactivityTimeout = options.InactivityTimeout;
 403772        _inactivityTimeoutTimer = new Timer(_ =>
 5773        {
 5774            bool requestShutdown = false;
 403775
 403776            lock (_mutex)
 5777            {
 5778                if (_shutdownTask is null && _dispatchInvocationCount == 0 && _streamInputOutputCount == 0)
 5779                {
 5780                    requestShutdown = true;
 5781                    RefuseNewInvocations(
 5782                        $"The connection was shut down because it was inactive for over {_inactivityTimeout.TotalSeconds
 5783                }
 5784            }
 403785
 5786            if (requestShutdown)
 5787            {
 403788                // TrySetResult must be called outside the mutex lock
 5789                _shutdownRequestedTcs.TrySetResult();
 5790            }
 408791        });
 403792    }
 793
 794    private static (IDictionary<TKey, ReadOnlySequence<byte>>, PipeReader?) DecodeFieldDictionary<TKey>(
 795        ref SliceDecoder decoder,
 796        DecodeFunc<TKey> decodeKeyFunc) where TKey : struct
 1813797    {
 1813798        int count = decoder.DecodeSize();
 799
 800        IDictionary<TKey, ReadOnlySequence<byte>> fields;
 801        PipeReader? pipeReader;
 1813802        if (count == 0)
 1793803        {
 1793804            fields = ImmutableDictionary<TKey, ReadOnlySequence<byte>>.Empty;
 1793805            pipeReader = null;
 1793806            decoder.CheckEndOfBuffer();
 1793807        }
 808        else
 20809        {
 810            // We don't use the normal collection allocation check here because SizeOf<ReadOnlySequence<byte>> is quite
 811            // large (24).
 812            // For example, say we decode a fields dictionary with a single field with an empty value. It's encoded
 813            // using 1 byte (dictionary size) + 1 byte (key) + 1 byte (value size) = 3 bytes. The decoder's default max
 814            // allocation size is 3 * 8 = 24. If we simply call IncreaseCollectionAllocation(1, 4 + 24), we'll exceed
 815            // the default collection allocation limit. (sizeof TKey is currently 4 but could/should increase to 8).
 816
 817            // Each field consumes at least 2 bytes: 1 for the key and one for the value size.
 20818            if ((long)count * 2 > decoder.Remaining)
 2819            {
 2820                throw new InvalidDataException("Too many fields.");
 821            }
 822
 18823            fields = new Dictionary<TKey, ReadOnlySequence<byte>>(count);
 18824            var pipe = new Pipe();
 18825            decoder.CopyTo(pipe.Writer);
 18826            pipe.Writer.Complete();
 827
 828            try
 18829            {
 18830                _ = pipe.Reader.TryRead(out ReadResult readResult);
 18831                var fieldsDecoder = new SliceDecoder(readResult.Buffer);
 832
 72833                for (int i = 0; i < count; ++i)
 19834                {
 835                    // Decode the field key.
 19836                    TKey key = decodeKeyFunc(ref fieldsDecoder);
 837
 838                    // Decode and check the field value size.
 839                    int valueSize;
 840                    try
 19841                    {
 19842                        valueSize = checked((int)fieldsDecoder.DecodeVarUInt62());
 19843                    }
 0844                    catch (OverflowException exception)
 0845                    {
 0846                        throw new InvalidDataException("The field size can't be larger than int.MaxValue.", exception);
 847                    }
 848
 19849                    if (valueSize > fieldsDecoder.Remaining)
 0850                    {
 0851                        throw new InvalidDataException(
 0852                            $"The value of field '{key}' extends beyond the end of the buffer.");
 853                    }
 854
 855                    // Create a ROS reference to the field value by slicing the fields pipe reader ROS.
 19856                    ReadOnlySequence<byte> value = readResult.Buffer.Slice(fieldsDecoder.Consumed, valueSize);
 857                    try
 19858                    {
 19859                        fields.Add(key, value);
 18860                    }
 1861                    catch (ArgumentException exception)
 1862                    {
 1863                        throw new InvalidDataException(
 1864                            $"Received icerpc header with duplicate field key '{key}'.", exception);
 865                    }
 866
 867                    // Skip the field value to prepare the decoder to read the next field value.
 18868                    fieldsDecoder.Skip(valueSize);
 18869                }
 17870                fieldsDecoder.CheckEndOfBuffer();
 871
 17872                pipe.Reader.AdvanceTo(readResult.Buffer.Start); // complete read without consuming anything
 873
 17874                pipeReader = pipe.Reader;
 17875            }
 1876            catch
 1877            {
 1878                pipe.Reader.Complete();
 1879                throw;
 880            }
 17881        }
 882
 883        // The caller is responsible for completing the pipe reader.
 1810884        return (fields, pipeReader);
 1810885    }
 886
 887    private async Task AcceptRequestsAsync(CancellationToken cancellationToken)
 331888    {
 331889        await Task.Yield(); // exit mutex lock
 890
 891        // Wait for _connectTask (which spawned the task running this method) to complete. This way, we won't dispatch
 892        // any request until _connectTask has completed successfully, and indirectly we won't make any invocation until
 893        // _connectTask has completed successfully. The creation of the _acceptRequestsTask is the last action taken by
 894        // _connectTask and as a result this await can't fail.
 331895        await _connectTask!.ConfigureAwait(false);
 896
 897        try
 331898        {
 899            // We check the cancellation token for each iteration because we want to exit the accept requests loop as
 900            // soon as ShutdownAsync/GoAway requests this cancellation, even when more streams can be accepted without
 901            // waiting.
 1763902            while (!cancellationToken.IsCancellationRequested)
 1763903            {
 904                // When _dispatcher is null, the multiplexed connection MaxUnidirectionalStreams and
 905                // MaxBidirectionalStreams options are configured to not accept any request-stream from the peer. As a
 906                // result, when _dispatcher is null, this call will block indefinitely until the cancellation token is
 907                // canceled by ShutdownAsync, GoAway or DisposeAsync.
 1763908                IMultiplexedStream stream = await _transportConnection.AcceptStreamAsync(cancellationToken)
 1763909                    .ConfigureAwait(false);
 910
 911                lock (_mutex)
 1432912                {
 913                    // We don't want to increment _dispatchInvocationCount/_streamInputOutputCount when the connection
 914                    // is shutting down or being disposed.
 1432915                    if (_shutdownTask is not null)
 0916                    {
 917                        // Note that cancellationToken may not be canceled yet at this point.
 0918                        throw new OperationCanceledException();
 919                    }
 920
 921                    // The logic in IncrementStreamInputOutputCount requires that we increment the dispatch-invocation
 922                    // count first.
 1432923                    IncrementDispatchInvocationCount();
 1432924                    IncrementStreamInputOutputCount(stream.IsBidirectional);
 925
 926                    // Decorate the stream to decrement the stream input/output count on Complete.
 1432927                    stream = new MultiplexedStreamDecorator(stream, DecrementStreamInputOutputCount);
 928
 929                    // The multiplexed connection guarantees that the IDs of accepted streams of a given type have ever
 930                    // increasing values.
 931
 1432932                    if (stream.IsBidirectional)
 423933                    {
 423934                        _lastRemoteBidirectionalStreamId = stream.Id;
 423935                    }
 936                    else
 1009937                    {
 1009938                        _lastRemoteUnidirectionalStreamId = stream.Id;
 1009939                    }
 1432940                }
 941
 942                // Start a task to read the stream and dispatch the request. We pass CancellationToken.None to Task.Run
 943                // because DispatchRequestAsync must clean-up the stream and the dispatch-invocation count.
 2864944                _ = Task.Run(() => DispatchRequestAsync(stream), CancellationToken.None);
 1432945            }
 0946        }
 256947        catch (OperationCanceledException)
 256948        {
 949            // Expected, the associated cancellation token source was canceled.
 256950        }
 75951        catch (IceRpcException)
 75952        {
 75953            RefuseNewInvocations("The connection was lost");
 75954            _ = _shutdownRequestedTcs.TrySetResult();
 75955            throw;
 956        }
 0957        catch (Exception exception)
 0958        {
 0959            Debug.Fail($"The accept stream task failed with an unexpected exception: {exception}");
 0960            RefuseNewInvocations("The connection was lost");
 0961            _ = _shutdownRequestedTcs.TrySetResult();
 0962            throw;
 963        }
 256964    }
 965
 966    private void CheckPeerHeaderSize(int headerSize)
 1831967    {
 1831968        if (headerSize > _maxPeerHeaderSize)
 2969        {
 2970            throw new IceRpcException(
 2971                IceRpcError.LimitExceeded,
 2972                $"The header size ({headerSize}) for an icerpc request or response is greater than the peer's max header
 973        }
 1829974    }
 975
 976    private void DecrementDispatchInvocationCount()
 2884977    {
 978        lock (_mutex)
 2884979        {
 2884980            if (--_dispatchInvocationCount == 0)
 807981            {
 807982                if (_shutdownTask is not null)
 29983                {
 29984                    _dispatchesAndInvocationsCompleted.TrySetResult();
 29985                }
 778986                else if (!_refuseInvocations && _streamInputOutputCount == 0)
 660987                {
 660988                    ScheduleInactivityCheck();
 660989                }
 807990            }
 2884991        }
 2884992    }
 993
 994    /// <summary>Decrements the stream input/output count.</summary>
 995    private void DecrementStreamInputOutputCount()
 3702996    {
 997        lock (_mutex)
 3702998        {
 3702999            if (--_streamInputOutputCount == 0)
 7981000            {
 7981001                if (_shutdownTask is not null)
 281002                {
 281003                    _streamsCompleted.TrySetResult();
 281004                }
 7701005                else if (!_refuseInvocations && _dispatchInvocationCount == 0)
 1071006                {
 1007                    // We enable the inactivity check in order to complete _shutdownRequestedTcs when inactive for too
 1008                    // long. _refuseInvocations is true when the connection is either about to be "shutdown requested",
 1009                    // or shut down / disposed. We don't need to complete _shutdownRequestedTcs in any of these
 1010                    // situations.
 1071011                    ScheduleInactivityCheck();
 1071012                }
 7981013            }
 37021014        }
 37021015    }
 1016
 1017    private async Task DispatchRequestAsync(IMultiplexedStream stream)
 14321018    {
 1019        // _disposedCts is not disposed since we own a dispatch count.
 14321020        CancellationToken cancellationToken = stream.IsBidirectional ?
 14321021            stream.WritesClosed.AsCancellationToken(_disposedCts.Token) :
 14321022            _disposedCts.Token;
 1023
 14321024        PipeReader? fieldsPipeReader = null;
 1025        IDictionary<RequestFieldKey, ReadOnlySequence<byte>> fields;
 1026        IceRpcRequestHeader header;
 1027
 14321028        PipeReader? streamInput = stream.Input;
 14321029        PipeWriter? streamOutput = stream.IsBidirectional ? stream.Output : null;
 14321030        bool success = false;
 1031
 1032        try
 14321033        {
 1034            try
 14321035            {
 14321036                ReadResult readResult = await streamInput.ReadSliceSegmentAsync(
 14321037                    _maxLocalHeaderSize,
 14321038                    cancellationToken).ConfigureAwait(false);
 1039
 14271040                if (readResult.Buffer.IsEmpty)
 11041                {
 11042                    throw new IceRpcException(IceRpcError.IceRpcError, "Received icerpc request with empty header.");
 1043                }
 1044
 14261045                (header, fields, fieldsPipeReader) = DecodeHeader(readResult.Buffer);
 14211046                streamInput.AdvanceTo(readResult.Buffer.End);
 14211047            }
 81048            catch (InvalidDataException exception)
 81049            {
 81050                var rpcException = new IceRpcException(
 81051                    IceRpcError.IceRpcError,
 81052                    "Received invalid icerpc request header.",
 81053                    exception);
 1054
 81055                if (_taskExceptionObserver is null)
 11056                {
 11057                    throw rpcException;
 1058                }
 1059                else
 71060                {
 71061                    _taskExceptionObserver.DispatchRefused(
 71062                        _connectionContext!.TransportConnectionInformation,
 71063                        rpcException);
 71064                    return; // success remains false
 1065                }
 1066            }
 31067            catch (Exception exception) when (_taskExceptionObserver is not null)
 21068            {
 21069                _taskExceptionObserver.DispatchRefused(_connectionContext!.TransportConnectionInformation, exception);
 21070                return; // success remains false
 1071            }
 1072
 14211073            using var request = new IncomingRequest(Protocol.IceRpc, _connectionContext!)
 14211074            {
 14211075                Fields = fields,
 14211076                IsOneway = !stream.IsBidirectional,
 14211077                Operation = header.Operation,
 14211078                Path = header.Path,
 14211079                Payload = streamInput
 14211080            };
 1081
 14211082            streamInput = null; // the request now owns streamInput
 1083
 1084            try
 14211085            {
 14211086                OutgoingResponse response = await PerformDispatchRequestAsync(request, cancellationToken)
 14211087                    .ConfigureAwait(false);
 1088
 14101089                if (!request.IsOneway)
 4011090                {
 4011091                    Debug.Assert(streamOutput is not null);
 4011092                    EncodeHeader(response);
 1093
 3991094                    PipeWriter payloadWriter = response.GetPayloadWriter(streamOutput);
 1095
 1096                    // We give flushResult an initial "failed" value, in case the first CopyFromAsync throws.
 3991097                    var flushResult = new FlushResult(isCanceled: true, isCompleted: false);
 1098
 1099                    try
 3991100                    {
 1101                        // We don't use cancellationToken here because it's canceled shortly afterwards by the
 1102                        // completion of writesClosed. This works around https://github.com/dotnet/runtime/issues/82704
 1103                        // where the stream would otherwise be aborted after the successful write. It's also fine to
 1104                        // just use _disposedCts.Token: if writes are closed because the peer is not longer interested
 1105                        // in the response, the write operations will raise an IceRpcException(StreamAborted) which is
 1106                        // ignored.
 3991107                        bool hasContinuation = response.PayloadContinuation is not null;
 1108
 3991109                        flushResult = await payloadWriter.CopyFromAsync(
 3991110                            response.Payload,
 3991111                            stream.WritesClosed,
 3991112                            endStream: !hasContinuation,
 3991113                            _disposedCts.Token).ConfigureAwait(false);
 1114
 3961115                        if (!flushResult.IsCompleted && !flushResult.IsCanceled && hasContinuation)
 21116                        {
 21117                            flushResult = await payloadWriter.CopyFromAsync(
 21118                                response.PayloadContinuation!,
 21119                                stream.WritesClosed,
 21120                                endStream: true,
 21121                                _disposedCts.Token).ConfigureAwait(false);
 11122                        }
 3951123                    }
 1124                    finally
 3991125                    {
 3991126                        payloadWriter.CompleteOutput(success: !flushResult.IsCanceled);
 3991127                        response.Payload.Complete();
 3991128                        response.PayloadContinuation?.Complete();
 3991129                    }
 3951130                }
 14041131            }
 171132            catch (Exception exception) when (_taskExceptionObserver is not null)
 71133            {
 71134                _taskExceptionObserver.DispatchFailed(
 71135                    request,
 71136                    _connectionContext!.TransportConnectionInformation,
 71137                    exception);
 71138                return; // success remains false
 1139            }
 14041140            success = true;
 14041141        }
 11142        catch (IceRpcException)
 11143        {
 1144            // Expected, with for example:
 1145            //  - IceRpcError.ConnectionAborted when the peer aborts the connection
 1146            //  - IceRpcError.IceRpcError when the request header is invalid
 1147            //  - IceRpcError.TruncatedData when the request header is truncated
 11148        }
 111149        catch (OperationCanceledException exception) when (
 111150            exception.CancellationToken == cancellationToken ||
 111151            exception.CancellationToken == _disposedCts.Token)
 111152        {
 1153            // Expected if the dispatch is canceled by the peer or the connection is disposed.
 111154        }
 01155        catch (Exception exception)
 01156        {
 1157            // This exception is unexpected when running the IceRPC test suite. A test that expects this exception must
 1158            // install a task exception observer.
 01159            Debug.Fail($"icerpc dispatch failed with an unexpected exception: {exception}");
 1160
 1161            // Generate unobserved task exception (UTE). If this exception is expected (e.g. an expected payload read
 1162            // exception) and the application wants to avoid this UTE, it must configure a non-null logger to install
 1163            // a task exception observer.
 01164            throw;
 1165        }
 1166        finally
 14321167        {
 14321168            if (!success)
 281169            {
 1170                // We always need to complete streamOutput when an exception is thrown. For example, we received an
 1171                // invalid request header that we could not decode.
 281172                streamOutput?.CompleteOutput(success: false);
 281173                streamInput?.Complete();
 281174            }
 14321175            fieldsPipeReader?.Complete();
 1176
 14321177            DecrementDispatchInvocationCount();
 14321178        }
 1179
 1180        async Task<OutgoingResponse> PerformDispatchRequestAsync(
 1181            IncomingRequest request,
 1182            CancellationToken cancellationToken)
 14211183        {
 14211184            Debug.Assert(_dispatcher is not null);
 1185
 1186            OutgoingResponse response;
 1187
 1188            try
 14211189            {
 14211190                if (_dispatchSemaphore is SemaphoreSlim dispatchSemaphore)
 14211191                {
 14211192                    await dispatchSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
 14191193                }
 1194
 1195                try
 14191196                {
 14191197                    response = await _dispatcher.DispatchAsync(request, cancellationToken).ConfigureAwait(false);
 13981198                }
 1199                finally
 14191200                {
 14191201                    _dispatchSemaphore?.Release();
 14191202                }
 1203
 13981204                if (response != request.Response)
 11205                {
 11206                    throw new InvalidOperationException(
 11207                        "The dispatcher did not return the last response created for this request.");
 1208                }
 13971209            }
 121210            catch (OperationCanceledException exception) when (cancellationToken == exception.CancellationToken)
 111211            {
 111212                throw;
 1213            }
 131214            catch (Exception exception)
 131215            {
 131216                if (exception is not DispatchException dispatchException)
 91217                {
 91218                    StatusCode statusCode = exception switch
 91219                    {
 21220                        InvalidDataException => StatusCode.InvalidData,
 01221                        NotSupportedException => StatusCode.NotSupported,
 41222                        IceRpcException iceRpcException when iceRpcException.IceRpcError == IceRpcError.TruncatedData =>
 41223                            StatusCode.TruncatedPayload,
 31224                        _ => StatusCode.InternalError
 91225                    };
 91226                    dispatchException = new DispatchException(statusCode, message: null, exception);
 91227                }
 131228                response = dispatchException.ToOutgoingResponse(request);
 131229            }
 1230
 14101231            return response;
 14101232        }
 1233
 1234        static (IceRpcRequestHeader, IDictionary<RequestFieldKey, ReadOnlySequence<byte>>, PipeReader?) DecodeHeader(
 1235            ReadOnlySequence<byte> buffer)
 14261236        {
 14261237            var decoder = new SliceDecoder(buffer);
 14261238            var header = new IceRpcRequestHeader(ref decoder);
 1239
 1240            // Ensure that the encoded path is a valid service address path.
 1241            try
 14261242            {
 14261243                ServiceAddress.CheckPath(header.Path);
 14231244            }
 31245            catch (FormatException exception)
 31246            {
 31247                throw new InvalidDataException(exception.Message, exception);
 1248            }
 1249
 14231250            (IDictionary<RequestFieldKey, ReadOnlySequence<byte>> fields, PipeReader? pipeReader) =
 14231251                DecodeFieldDictionary(
 14231252                    ref decoder,
 14391253                    (ref SliceDecoder decoder) => decoder.DecodeRequestFieldKey());
 1254
 14211255            return (header, fields, pipeReader);
 14211256        }
 1257
 1258        void EncodeHeader(OutgoingResponse response)
 4011259        {
 4011260            var encoder = new SliceEncoder(streamOutput);
 1261
 1262            // Write the IceRpc response header.
 4011263            Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(_headerSizeLength);
 1264
 1265            // We use UnflushedBytes because EncodeFieldDictionary can write directly to streamOutput.
 4011266            long headerStartPos = streamOutput.UnflushedBytes;
 1267
 4011268            encoder.EncodeStatusCode(response.StatusCode);
 4011269            if (response.StatusCode > StatusCode.Ok)
 331270            {
 331271                encoder.EncodeString(response.ErrorMessage!);
 331272            }
 1273
 4011274            EncodeFieldDictionary(
 4011275                response.Fields,
 51276                (ref SliceEncoder encoder, ResponseFieldKey key) => encoder.EncodeResponseFieldKey(key),
 4011277                ref encoder,
 4011278                streamOutput);
 1279
 1280            // We're done with the header encoding, write the header size.
 4001281            int headerSize = (int)(streamOutput.UnflushedBytes - headerStartPos);
 4001282            CheckPeerHeaderSize(headerSize);
 3991283            SliceEncoder.EncodeVarUInt62((uint)headerSize, sizePlaceholder);
 3991284        }
 14321285    }
 1286
 1287    /// <summary>Encodes the fields dictionary at the end of a request or response header.</summary>
 1288    /// <remarks>This method can write bytes directly to <paramref name="output"/> without going through
 1289    /// <paramref name="encoder"/>.</remarks>
 1290    private void EncodeFieldDictionary<TKey>(
 1291        IDictionary<TKey, OutgoingFieldValue> fields,
 1292        EncodeAction<TKey> encodeKeyAction,
 1293        ref SliceEncoder encoder,
 1294        PipeWriter output) where TKey : struct =>
 18321295        encoder.EncodeDictionary(
 18321296            fields,
 18321297            encodeKeyAction,
 18321298            (ref SliceEncoder encoder, OutgoingFieldValue value) =>
 191299                {
 191300                    if (value.WriteAction is Action<IBufferWriter<byte>> writeAction)
 91301                    {
 91302                        Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(_headerSizeLength);
 91303                        long startPos = output.UnflushedBytes;
 91304                        writeAction(output);
 81305                        SliceEncoder.EncodeVarUInt62((ulong)(output.UnflushedBytes - startPos), sizePlaceholder);
 81306                    }
 18321307                    else
 101308                    {
 101309                        encoder.EncodeSize(checked((int)value.ByteSequence.Length));
 101310                        encoder.WriteByteSequence(value.ByteSequence);
 101311                    }
 18501312                });
 1313
 1314    /// <summary>Increments the dispatch-invocation count.</summary>
 1315    /// <remarks>This method must be called with _mutex locked.</remarks>
 1316    private void IncrementDispatchInvocationCount()
 28841317    {
 28841318        if (_dispatchInvocationCount++ == 0 && _streamInputOutputCount == 0)
 8071319        {
 1320            // Cancel inactivity check.
 8071321            _inactivityTimeoutTimer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
 8071322        }
 28841323    }
 1324
 1325    /// <summary>Increments the stream input/output count.</summary>
 1326    /// <remarks>This method must be called with _mutex locked.</remarks>
 1327    private void IncrementStreamInputOutputCount(bool bidirectional)
 28631328    {
 28631329        Debug.Assert(_dispatchInvocationCount > 0);
 28631330        _streamInputOutputCount += bidirectional ? 2 : 1;
 28631331    }
 1332
 1333    private async Task ReadGoAwayAsync(CancellationToken cancellationToken)
 3311334    {
 3311335        await Task.Yield(); // exit mutex lock
 1336
 1337        // Wait for _connectTask (which spawned the task running this method) to complete. This await can't fail.
 1338        // This guarantees this method won't request a shutdown until after _connectTask completed successfully.
 3311339        await _connectTask!.ConfigureAwait(false);
 1340
 3311341        PipeReader remoteInput = _remoteControlStream!.Input!;
 1342
 1343        try
 3311344        {
 1345            // Wait to receive the GoAway frame.
 3311346            await ReceiveControlFrameHeaderAsync(IceRpcControlFrameType.GoAway, cancellationToken)
 3311347                .ConfigureAwait(false);
 1348
 871349            ReadResult readResult = await remoteInput.ReadSliceSegmentAsync(
 871350                MaxGoAwayFrameBodySize,
 871351                cancellationToken).ConfigureAwait(false);
 1352
 1353            // We don't call CancelPendingRead on remoteInput
 851354            Debug.Assert(!readResult.IsCanceled);
 1355
 1356            try
 851357            {
 851358                _goAwayFrame =
 1701359                    readResult.Buffer.DecodeSliceBuffer((ref SliceDecoder decoder) => new IceRpcGoAway(ref decoder));
 841360            }
 1361            finally
 851362            {
 851363                remoteInput.AdvanceTo(readResult.Buffer.End);
 851364            }
 1365
 841366            RefuseNewInvocations("The connection was shut down because it received a GoAway frame from the peer.");
 841367            _goAwayCts.Cancel();
 841368            _ = _shutdownRequestedTcs.TrySetResult();
 841369        }
 1651370        catch (OperationCanceledException)
 1651371        {
 1372            // The connection is disposed and we let this exception cancel the task.
 1651373            throw;
 1374        }
 771375        catch (IceRpcException)
 771376        {
 1377            // We let the task complete with this expected exception.
 771378            throw;
 1379        }
 51380        catch (InvalidDataException exception)
 51381        {
 1382            // "expected" in the sense it should not trigger a Debug.Fail.
 51383            throw new IceRpcException(
 51384                IceRpcError.IceRpcError,
 51385                "The ReadGoAway task was aborted by an icerpc protocol error.",
 51386                exception);
 1387        }
 01388        catch (Exception exception)
 01389        {
 01390            Debug.Fail($"The read go away task failed with an unexpected exception: {exception}");
 01391            throw;
 1392        }
 841393    }
 1394
 1395    private async ValueTask ReceiveControlFrameHeaderAsync(
 1396        IceRpcControlFrameType expectedFrameType,
 1397        CancellationToken cancellationToken)
 6751398    {
 6751399        ReadResult readResult = await _remoteControlStream!.Input.ReadAsync(cancellationToken).ConfigureAwait(false);
 1400
 1401        // We don't call CancelPendingRead on _remoteControlStream.Input.
 4271402        Debug.Assert(!readResult.IsCanceled);
 1403
 4271404        if (readResult.Buffer.IsEmpty)
 11405        {
 11406            throw new InvalidDataException(
 11407                "Failed to read the frame type because no more data is available from the control stream.");
 1408        }
 1409
 4261410        var frameType = (IceRpcControlFrameType)readResult.Buffer.FirstSpan[0];
 4261411        if (frameType != expectedFrameType)
 31412        {
 31413            throw new InvalidDataException($"Received frame type {frameType} but expected {expectedFrameType}.");
 1414        }
 4231415        _remoteControlStream!.Input.AdvanceTo(readResult.Buffer.GetPosition(1));
 4231416    }
 1417
 1418    private async ValueTask ReceiveSettingsFrameBody(CancellationToken cancellationToken)
 3361419    {
 1420        // We are still in the single-threaded initialization at this point.
 1421
 3361422        PipeReader input = _remoteControlStream!.Input;
 3361423        ReadResult readResult = await input.ReadSliceSegmentAsync(
 3361424            MaxSettingsFrameBodySize,
 3361425            cancellationToken).ConfigureAwait(false);
 1426
 1427        // We don't call CancelPendingRead on _remoteControlStream.Input
 3351428        Debug.Assert(!readResult.IsCanceled);
 1429
 1430        try
 3351431        {
 3351432            IceRpcSettings settings =
 6701433                readResult.Buffer.DecodeSliceBuffer((ref SliceDecoder decoder) => new IceRpcSettings(ref decoder));
 1434
 3321435            if (settings.Value.TryGetValue(IceRpcSettingKey.MaxHeaderSize, out ulong value))
 31436            {
 1437                // a varuint62 always fits in a long
 1438                try
 31439                {
 31440                    _maxPeerHeaderSize = ConnectionOptions.IceRpcCheckMaxHeaderSize((long)value);
 21441                }
 11442                catch (ArgumentOutOfRangeException exception)
 11443                {
 11444                    throw new InvalidDataException("Received invalid maximum header size setting.", exception);
 1445                }
 21446                _headerSizeLength = SliceEncoder.GetVarUInt62EncodedSize(value);
 21447            }
 1448            // all other settings are unknown and ignored
 3311449        }
 1450        finally
 3351451        {
 3351452            input.AdvanceTo(readResult.Buffer.End);
 3351453        }
 3311454    }
 1455
 1456    private void RefuseNewInvocations(string message)
 6691457    {
 1458        lock (_mutex)
 6691459        {
 6691460            _refuseInvocations = true;
 6691461            _invocationRefusedMessage ??= message;
 6691462        }
 6691463    }
 1464
 1465    // The inactivity check executes once in _inactivityTimeout. By then either:
 1466    // - the connection is no longer inactive (and the inactivity check is canceled or being canceled)
 1467    // - the connection is still inactive and we request shutdown
 1468    private void ScheduleInactivityCheck() =>
 10981469        _inactivityTimeoutTimer.Change(_inactivityTimeout, Timeout.InfiniteTimeSpan);
 1470
 1471    private ValueTask<FlushResult> SendControlFrameAsync(
 1472        IceRpcControlFrameType frameType,
 1473        EncodeAction encodeAction,
 1474        CancellationToken cancellationToken)
 4531475    {
 4531476        PipeWriter output = _controlStream!.Output;
 1477
 4531478        EncodeFrame(output);
 1479
 4531480        return output.FlushAsync(cancellationToken); // Flush
 1481
 1482        void EncodeFrame(IBufferWriter<byte> buffer)
 4531483        {
 4531484            var encoder = new SliceEncoder(buffer);
 4531485            encoder.EncodeIceRpcControlFrameType(frameType);
 4531486            Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(_headerSizeLength);
 4531487            int startPos = encoder.EncodedByteCount; // does not include the size
 4531488            encodeAction.Invoke(ref encoder);
 4531489            int frameSize = encoder.EncodedByteCount - startPos;
 4531490            SliceEncoder.EncodeVarUInt62((uint)frameSize, sizePlaceholder);
 4531491        }
 4531492    }
 1493
 1494    /// <summary>Sends the payload continuation of an outgoing request in the background.</summary>
 1495    /// <remarks>We send the payload continuation on a separate thread with Task.Run: this ensures that the synchronous
 1496    /// activity that could result from reading or writing the payload continuation doesn't delay in any way the
 1497    /// caller. </remarks>
 1498    /// <param name="request">The outgoing request.</param>
 1499    /// <param name="payloadWriter">The payload writer.</param>
 1500    /// <param name="writesClosed">A task that completes when we can no longer write to payloadWriter.</param>
 1501    /// <param name="onGoAway">An action to execute with a CTS when we receive the GoAway frame from the peer.</param>
 1502    /// <param name="cancellationToken">The cancellation token of the invocation; the associated CTS is disposed when
 1503    /// the invocation completes.</param>
 1504    private void SendRequestPayloadContinuation(
 1505        OutgoingRequest request,
 1506        PipeWriter payloadWriter,
 1507        Task writesClosed,
 1508        Action<object?> onGoAway,
 1509        CancellationToken cancellationToken)
 121510    {
 121511        Debug.Assert(request.PayloadContinuation is not null);
 1512
 1513        // First "detach" the continuation.
 121514        PipeReader payloadContinuation = request.PayloadContinuation;
 121515        request.PayloadContinuation = null;
 1516
 1517        lock (_mutex)
 121518        {
 121519            Debug.Assert(_dispatchInvocationCount > 0); // as a result, can't be disposed.
 1520
 1521            // Give the task its own dispatch-invocation count. This ensures the transport connection won't be disposed
 1522            // while the continuation is being sent.
 121523            IncrementDispatchInvocationCount();
 121524        }
 1525
 1526        // This background task owns payloadContinuation, payloadWriter and 1 dispatch-invocation count, and must clean
 1527        // them up. Hence CancellationToken.None.
 121528        _ = Task.Run(PerformSendRequestPayloadContinuationAsync, CancellationToken.None);
 1529
 1530        async Task PerformSendRequestPayloadContinuationAsync()
 121531        {
 121532            bool success = false;
 1533
 1534            try
 121535            {
 1536                // Since _dispatchInvocationCount > 0, _disposedCts is not disposed.
 121537                using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _disposedCts.Token);
 1538
 1539                // This token registration is needed for one-way requests and is redundant for two-way requests.
 1540                // We want GoAway to cancel the sending of one-way requests that have not been received by the peer,
 1541                // especially when these requests have payload continuations.
 121542                using CancellationTokenRegistration tokenRegistration = _goAwayCts.Token.UnsafeRegister(onGoAway, cts);
 1543
 1544                try
 121545                {
 1546                    // The cancellation of the InvokeAsync's cancellationToken cancels cts only until InvokeAsync's
 1547                    // PerformInvokeAsync completes. Afterwards, the cancellation of InvokeAsync's cancellationToken has
 1548                    // no effect on cts, so it doesn't cancel the copying of payloadContinuation.
 121549                    FlushResult flushResult = await payloadWriter.CopyFromAsync(
 121550                        payloadContinuation,
 121551                        writesClosed,
 121552                        endStream: true,
 121553                        cts.Token).ConfigureAwait(false);
 1554
 51555                    success = !flushResult.IsCanceled;
 51556                }
 31557                catch (OperationCanceledException exception) when (exception.CancellationToken == cts.Token)
 21558                {
 1559                    // Process/translate this exception primarily for the benefit of _taskExceptionObserver.
 1560
 1561                    // Can be because cancellationToken was canceled by DisposeAsync or GoAway; that's fine.
 21562                    cancellationToken.ThrowIfCancellationRequested();
 1563
 11564                    if (_disposedCts.IsCancellationRequested)
 01565                    {
 1566                        // DisposeAsync aborted the request.
 01567                        throw new IceRpcException(IceRpcError.OperationAborted);
 1568                    }
 1569                    else
 11570                    {
 1571                        // When _goAwayCts is canceled and onGoAway cancels its argument:
 1572                        // - if PerformInvokeAsync is no longer running (typical for a one-way request), we get here
 1573                        // - if PerformInvokeAsync is still running, we may get here or cancellationToken gets canceled
 1574                        // first.
 11575                        Debug.Assert(_goAwayCts.IsCancellationRequested);
 11576                        throw new IceRpcException(IceRpcError.InvocationCanceled, "The connection is shutting down.");
 1577                    }
 1578                }
 51579            }
 71580            catch (Exception exception) when (_taskExceptionObserver is not null)
 51581            {
 51582                _taskExceptionObserver.RequestPayloadContinuationFailed(
 51583                    request,
 51584                    _connectionContext!.TransportConnectionInformation,
 51585                    exception);
 51586            }
 11587            catch (OperationCanceledException exception) when (exception.CancellationToken == cancellationToken)
 11588            {
 1589                // Expected.
 11590            }
 11591            catch (IceRpcException)
 11592            {
 1593                // Expected, with for example IceRpcError.ConnectionAborted when the peer aborts the connection.
 11594            }
 01595            catch (Exception exception)
 01596            {
 1597                // This exception is unexpected when running the IceRPC test suite. A test that expects such an
 1598                // exception must install a task exception observer.
 01599                Debug.Fail($"Failed to send payload continuation of request {request}: {exception}");
 1600
 1601                // If Debug is not enabled and there is no task exception observer, we rethrow to generate an
 1602                // Unobserved Task Exception.
 01603                throw;
 1604            }
 1605            finally
 121606            {
 121607                payloadWriter.CompleteOutput(success);
 121608                payloadContinuation.Complete();
 121609                DecrementDispatchInvocationCount();
 121610            }
 121611        }
 121612    }
 1613}

Methods/Properties

get_IsServer()
.ctor(IceRpc.Transports.IMultiplexedConnection,IceRpc.Transports.TransportConnectionInformation,IceRpc.ConnectionOptions,IceRpc.Internal.ITaskExceptionObserver)
ConnectAsync(System.Threading.CancellationToken)
PerformConnectAsync()
DisposeAsync()
PerformDisposeAsync()
InvokeAsync(IceRpc.OutgoingRequest,System.Threading.CancellationToken)
PerformInvokeAsync()
OnGoAway()
DecodeHeader()
EncodeHeader()
ShutdownAsync(System.Threading.CancellationToken)
PerformShutdownAsync()
DecodeFieldDictionary(ZeroC.Slice.Codec.SliceDecoder&,ZeroC.Slice.Codec.DecodeFunc`1<TKey>)
AcceptRequestsAsync()
CheckPeerHeaderSize(System.Int32)
DecrementDispatchInvocationCount()
DecrementStreamInputOutputCount()
DispatchRequestAsync()
PerformDispatchRequestAsync()
DecodeHeader()
EncodeHeader()
EncodeFieldDictionary(System.Collections.Generic.IDictionary`2<TKey,IceRpc.OutgoingFieldValue>,ZeroC.Slice.Codec.EncodeAction`1<TKey>,ZeroC.Slice.Codec.SliceEncoder&,System.IO.Pipelines.PipeWriter)
IncrementDispatchInvocationCount()
IncrementStreamInputOutputCount(System.Boolean)
ReadGoAwayAsync()
ReceiveControlFrameHeaderAsync()
ReceiveSettingsFrameBody()
RefuseNewInvocations(System.String)
ScheduleInactivityCheck()
SendControlFrameAsync(IceRpc.Internal.IceRpcControlFrameType,ZeroC.Slice.Codec.EncodeAction,System.Threading.CancellationToken)
EncodeFrame()
SendRequestPayloadContinuation(IceRpc.OutgoingRequest,System.IO.Pipelines.PipeWriter,System.Threading.Tasks.Task,System.Action`1<System.Object>,System.Threading.CancellationToken)
PerformSendRequestPayloadContinuationAsync()