< Summary

Information
Class: IceRpc.Internal.IceRpcProtocolConnection
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Internal/IceRpcProtocolConnection.cs
Tag: 701_22528036593
Line coverage
91%
Covered lines: 858
Uncovered lines: 79
Coverable lines: 937
Total lines: 1599
Line coverage: 91.5%
Branch coverage
89%
Covered branches: 200
Total branches: 224
Branch coverage: 89.2%
Method coverage
100%
Covered methods: 34
Total methods: 34
Method coverage: 100%

Metrics

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Internal/IceRpcProtocolConnection.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Transports;
 4using System.Buffers;
 5using System.Collections.Immutable;
 6using System.Diagnostics;
 7using System.IO.Pipelines;
 8using System.Security.Authentication;
 9using ZeroC.Slice;
 10
 11namespace IceRpc.Internal;
 12
 13internal sealed class IceRpcProtocolConnection : IProtocolConnection
 14{
 15    private const int MaxGoAwayFrameBodySize = 16;
 16    private const int MaxSettingsFrameBodySize = 1024;
 17
 148718    private bool IsServer => _transportConnectionInformation is not null;
 19
 20    private Task? _acceptRequestsTask;
 21
 22    private Task? _connectTask;
 23    private IConnectionContext? _connectionContext; // non-null once the connection is established
 24    private IMultiplexedStream? _controlStream;
 25
 26    // The number of outstanding dispatches and invocations.
 27    // DisposeAsync waits until this count reaches 0 (using _dispatchesAndInvocationsCompleted) before disposing the
 28    // underlying transport connection. So when this count is greater than 0, we know _transportConnection and other
 29    // fields are not disposed.
 30    // _dispatchInvocationCount is also used for the inactivity check: the connection remains active while
 31    // _dispatchInvocationCount > 0 or _streamInputOutputCount > 0.
 32    private int _dispatchInvocationCount;
 33
 34    private readonly SemaphoreSlim? _dispatchSemaphore;
 35
 36    private readonly IDispatcher? _dispatcher;
 37137    private readonly TaskCompletionSource _dispatchesAndInvocationsCompleted =
 37138        new(TaskCreationOptions.RunContinuationsAsynchronously);
 39
 40    private Task? _disposeTask;
 41
 42    // This cancellation token source is canceled when the connection is disposed.
 37143    private readonly CancellationTokenSource _disposedCts = new();
 44
 45    // Canceled when we receive the GoAway frame from the peer.
 37146    private readonly CancellationTokenSource _goAwayCts = new();
 47
 48    // The GoAway frame received from the peer. Read it only after _goAwayCts is canceled.
 49    private IceRpcGoAway _goAwayFrame;
 50
 51    // The number of bytes we need to encode a size up to _maxPeerHeaderSize. It's 2 for DefaultMaxIceRpcHeaderSize.
 37152    private int _headerSizeLength = 2;
 53
 54    private readonly TimeSpan _inactivityTimeout;
 55    private readonly Timer _inactivityTimeoutTimer;
 56    private string? _invocationRefusedMessage;
 57
 58    // The ID of the last bidirectional stream accepted by this connection. It's null as long as no bidirectional stream
 59    // was accepted.
 60    private ulong? _lastRemoteBidirectionalStreamId;
 61
 62    // The ID of the last unidirectional stream accepted by this connection. It's null as long as no unidirectional
 63    // stream (other than _remoteControlStream) was accepted.
 64    private ulong? _lastRemoteUnidirectionalStreamId;
 65
 66    private readonly int _maxLocalHeaderSize;
 37167    private int _maxPeerHeaderSize = ConnectionOptions.DefaultMaxIceRpcHeaderSize;
 68
 37169    private readonly Lock _mutex = new();
 70
 71    private Task? _readGoAwayTask;
 72
 73    // A connection refuses invocations when it's disposed, shut down, shutting down or merely "shutdown requested".
 74    private bool _refuseInvocations;
 75
 76    private IMultiplexedStream? _remoteControlStream;
 77
 78    private readonly CancellationTokenSource _shutdownOrGoAwayCts;
 79
 80    // The thread that completes this TCS can run the continuations, and as a result its result must be set without
 81    // holding a lock on _mutex.
 37182    private readonly TaskCompletionSource _shutdownRequestedTcs = new();
 83
 84    private Task? _shutdownTask;
 85
 86    // Keeps track of the number of stream Input and Output that are not completed yet.
 87    // It's not the same as the _dispatchInvocationCount: a dispatch or invocation can be completed while the
 88    // application is still reading an incoming frame payload that corresponds to a stream input.
 89    // ShutdownAsync waits for both _streamInputOutputCount and _dispatchInvocationCount to reach 0, while DisposeAsync
 90    // only waits for _dispatchInvocationCount to reach 0.
 91    private int _streamInputOutputCount;
 92
 93    // The streams are completed when _shutdownTask is not null and _streamInputOutputCount is 0.
 37194    private readonly TaskCompletionSource _streamsCompleted = new(TaskCreationOptions.RunContinuationsAsynchronously);
 95
 96    private readonly ITaskExceptionObserver? _taskExceptionObserver;
 97
 98    private readonly IMultiplexedConnection _transportConnection;
 99
 100    // Only set for server connections.
 101    private readonly TransportConnectionInformation? _transportConnectionInformation;
 102
 103    public Task<(TransportConnectionInformation ConnectionInformation, Task ShutdownRequested)> ConnectAsync(
 104        CancellationToken cancellationToken)
 364105    {
 106        Task<(TransportConnectionInformation ConnectionInformation, Task ShutdownRequested)> result;
 107
 108        lock (_mutex)
 364109        {
 364110            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 111
 362112            if (_connectTask is not null)
 0113            {
 0114                throw new InvalidOperationException("Cannot call connect more than once.");
 115            }
 116
 362117            result = PerformConnectAsync();
 362118            _connectTask = result;
 362119        }
 362120        return result;
 121
 122        async Task<(TransportConnectionInformation ConnectionInformation, Task ShutdownRequested)> PerformConnectAsync()
 362123        {
 124            // Make sure we execute the function without holding the connection mutex lock.
 362125            await Task.Yield();
 126
 127            // _disposedCts is not disposed at this point because DisposeAsync waits for the completion of _connectTask.
 362128            using var connectCts = CancellationTokenSource.CreateLinkedTokenSource(
 362129                cancellationToken,
 362130                _disposedCts.Token);
 131
 132            TransportConnectionInformation transportConnectionInformation;
 133
 134            try
 362135            {
 136                // If the transport connection information is null, we need to connect the transport connection. It's
 137                // null for client connections. The transport connection of a server connection is established by
 138                // Server.
 362139                transportConnectionInformation = _transportConnectionInformation ??
 362140                    await _transportConnection.ConnectAsync(connectCts.Token).ConfigureAwait(false);
 141
 337142                _controlStream = await _transportConnection.CreateStreamAsync(
 337143                    false,
 337144                    connectCts.Token).ConfigureAwait(false);
 145
 333146                var settings = new IceRpcSettings(
 333147                    _maxLocalHeaderSize == ConnectionOptions.DefaultMaxIceRpcHeaderSize ?
 333148                        ImmutableDictionary<IceRpcSettingKey, ulong>.Empty :
 333149                        new Dictionary<IceRpcSettingKey, ulong>
 333150                        {
 333151                            [IceRpcSettingKey.MaxHeaderSize] = (ulong)_maxLocalHeaderSize
 333152                        });
 153
 154                try
 333155                {
 333156                    await SendControlFrameAsync(
 333157                        IceRpcControlFrameType.Settings,
 333158                        settings.Encode,
 333159                        connectCts.Token).ConfigureAwait(false);
 329160                }
 4161                catch
 4162                {
 163                    // If we fail to send the Settings frame, we are in an abortive closure and we close Output to allow
 164                    // the peer to continue if it's waiting for us. This could happen when the cancellation token is
 165                    // canceled.
 4166                    _controlStream!.Output.CompleteOutput(success: false);
 4167                    throw;
 168                }
 169
 170                // Wait for the remote control stream to be accepted and read the protocol Settings frame
 329171                _remoteControlStream = await _transportConnection.AcceptStreamAsync(
 329172                    connectCts.Token).ConfigureAwait(false);
 173
 310174                await ReceiveControlFrameHeaderAsync(
 310175                    IceRpcControlFrameType.Settings,
 310176                    connectCts.Token).ConfigureAwait(false);
 177
 304178                await ReceiveSettingsFrameBody(connectCts.Token).ConfigureAwait(false);
 299179            }
 29180            catch (OperationCanceledException)
 29181            {
 29182                cancellationToken.ThrowIfCancellationRequested();
 183
 9184                Debug.Assert(_disposedCts.Token.IsCancellationRequested);
 9185                throw new IceRpcException(
 9186                    IceRpcError.OperationAborted,
 9187                    "The connection establishment was aborted because the connection was disposed.");
 188            }
 7189            catch (InvalidDataException exception)
 7190            {
 7191                throw new IceRpcException(
 7192                    IceRpcError.ConnectionAborted,
 7193                    "The connection establishment was aborted by an icerpc protocol error.",
 7194                    exception);
 195            }
 1196            catch (AuthenticationException)
 1197            {
 1198                throw;
 199            }
 26200            catch (IceRpcException)
 26201            {
 26202                throw;
 203            }
 0204            catch (Exception exception)
 0205            {
 0206                Debug.Fail($"ConnectAsync failed with an unexpected exception: {exception}");
 0207                throw;
 208            }
 209
 210            // This needs to be set before starting the accept requests task below.
 299211            _connectionContext = new ConnectionContext(this, transportConnectionInformation);
 212
 213            // We assign _readGoAwayTask and _acceptRequestsTask with _mutex locked to make sure this assignment
 214            // occurs before the start of DisposeAsync. Once _disposeTask is not null, _readGoAwayTask etc are
 215            // immutable.
 216            lock (_mutex)
 299217            {
 299218                if (_disposeTask is not null)
 1219                {
 1220                    throw new IceRpcException(
 1221                        IceRpcError.OperationAborted,
 1222                        "The connection establishment was aborted because the connection was disposed.");
 223                }
 224
 225                // Read the go away frame from the control stream.
 298226                _readGoAwayTask = ReadGoAwayAsync(_disposedCts.Token);
 227
 228                // Start a task that accepts requests (the "accept requests loop")
 298229                _acceptRequestsTask = AcceptRequestsAsync(_shutdownOrGoAwayCts.Token);
 298230            }
 231
 232            // The _acceptRequestsTask waits for this PerformConnectAsync completion before reading anything. As soon as
 233            // it receives a request, it will cancel this inactivity check.
 298234            ScheduleInactivityCheck();
 235
 298236            return (transportConnectionInformation, _shutdownRequestedTcs.Task);
 298237        }
 362238    }
 239
 240    public ValueTask DisposeAsync()
 397241    {
 242        lock (_mutex)
 397243        {
 397244            if (_disposeTask is null)
 371245            {
 371246                RefuseNewInvocations("The connection was disposed.");
 247
 371248                if (_streamInputOutputCount == 0)
 359249                {
 250                    // That's only for consistency. _streamsCompleted.Task matters only to ShutdownAsync.
 359251                    _streamsCompleted.TrySetResult();
 359252                }
 371253                if (_dispatchInvocationCount == 0)
 363254                {
 363255                    _dispatchesAndInvocationsCompleted.TrySetResult();
 363256                }
 257
 371258                _shutdownTask ??= Task.CompletedTask;
 371259                _disposeTask = PerformDisposeAsync();
 371260            }
 397261        }
 397262        return new(_disposeTask);
 263
 264        async Task PerformDisposeAsync()
 371265        {
 266            // Make sure we execute the code below without holding the mutex lock.
 371267            await Task.Yield();
 268
 371269            _disposedCts.Cancel();
 270
 271            // We don't lock _mutex since once _disposeTask is not null, _connectTask etc are immutable.
 272
 371273            if (_connectTask is not null)
 362274            {
 275                // We wait for _dispatchesAndInvocationsCompleted (since dispatches and invocations are somewhat under
 276                // our control), but not for _streamsCompleted, since we can't make the application complete the
 277                // incoming payload pipe readers.
 278                try
 362279                {
 362280                    await Task.WhenAll(
 362281                        _connectTask,
 362282                        _acceptRequestsTask ?? Task.CompletedTask,
 362283                        _readGoAwayTask ?? Task.CompletedTask,
 362284                        _shutdownTask,
 362285                        _dispatchesAndInvocationsCompleted.Task).ConfigureAwait(false);
 73286                }
 289287                catch
 289288                {
 289                    // Expected if any of these tasks failed or was canceled. Each task takes care of handling
 290                    // unexpected exceptions so there's no need to handle them here.
 289291                }
 362292            }
 293
 294            // If the application is still reading some incoming payload, the disposal of the transport connection can
 295            // abort this reading.
 371296            await _transportConnection.DisposeAsync().ConfigureAwait(false);
 297
 298            // It's safe to complete the output since write operations have been completed by the transport connection
 299            // disposal.
 371300            _controlStream?.Output.Complete();
 301
 302            // It's safe to complete the input since read operations have been completed by the transport connection
 303            // disposal.
 371304            _remoteControlStream?.Input.Complete();
 305
 371306            _dispatchSemaphore?.Dispose();
 371307            _disposedCts.Dispose();
 371308            _goAwayCts.Dispose();
 371309            _shutdownOrGoAwayCts.Dispose();
 310
 371311            await _inactivityTimeoutTimer.DisposeAsync().ConfigureAwait(false);
 371312        }
 397313    }
 314
 315    public Task<IncomingResponse> InvokeAsync(OutgoingRequest request, CancellationToken cancellationToken = default)
 1437316    {
 1437317        if (request.Protocol != Protocol.IceRpc)
 1318        {
 1319            throw new InvalidOperationException(
 1320                $"Cannot send {request.Protocol} request on {Protocol.IceRpc} connection.");
 321        }
 322
 323        lock (_mutex)
 1436324        {
 1436325            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 326
 1434327            if (_refuseInvocations)
 1328            {
 1329                throw new IceRpcException(IceRpcError.InvocationRefused, _invocationRefusedMessage);
 330            }
 1433331            if (_connectTask is null)
 0332            {
 0333                throw new InvalidOperationException("Cannot invoke on a connection before connecting it.");
 334            }
 1433335            if (!IsServer && !_connectTask.IsCompletedSuccessfully)
 0336            {
 0337                throw new InvalidOperationException(
 0338                    "Cannot invoke on a client connection that is not fully established.");
 339            }
 340            // It's possible but rare to invoke on a server connection that is still connecting.
 341
 1433342            if (request.ServiceAddress.Fragment.Length > 0)
 0343            {
 0344                throw new NotSupportedException("The icerpc protocol does not support fragments.");
 345            }
 346
 1433347            IncrementDispatchInvocationCount();
 1433348        }
 349
 1433350        return PerformInvokeAsync();
 351
 352        async Task<IncomingResponse> PerformInvokeAsync()
 1433353        {
 354            // Since _dispatchInvocationCount > 0, _disposedCts is not disposed.
 1433355            using var invocationCts = CancellationTokenSource.CreateLinkedTokenSource(
 1433356                cancellationToken,
 1433357                _disposedCts.Token);
 358
 1433359            PipeReader? streamInput = null;
 360
 361            // This try/catch block cleans up streamInput (when not null) and decrements the dispatch-invocation count.
 362            try
 1433363            {
 364                // Create the stream.
 365                IMultiplexedStream stream;
 366                try
 1433367                {
 368                    // We want to cancel CreateStreamAsync as soon as the connection is being shutdown or received a
 369                    // GoAway frame.
 1433370                    using CancellationTokenRegistration _ = _shutdownOrGoAwayCts.Token.UnsafeRegister(
 5371                        cts => ((CancellationTokenSource)cts!).Cancel(),
 1433372                        invocationCts);
 373
 1433374                    stream = await _transportConnection.CreateStreamAsync(
 1433375                        bidirectional: !request.IsOneway,
 1433376                        invocationCts.Token).ConfigureAwait(false);
 377
 1424378                    streamInput = stream.IsBidirectional ? stream.Input : null;
 1424379                }
 8380                catch (OperationCanceledException)
 8381                {
 8382                    cancellationToken.ThrowIfCancellationRequested();
 383
 384                    // Connection was shutdown or disposed and we did not read the payload at all.
 6385                    throw new IceRpcException(IceRpcError.InvocationRefused, _invocationRefusedMessage);
 386                }
 1387                catch (IceRpcException exception)
 1388                {
 1389                    RefuseNewInvocations("The connection was lost.");
 1390                    throw new IceRpcException(IceRpcError.InvocationRefused, _invocationRefusedMessage, exception);
 391                }
 0392                catch (Exception exception)
 0393                {
 0394                    Debug.Fail($"CreateStreamAsync failed with an unexpected exception: {exception}");
 0395                    RefuseNewInvocations("The connection was lost.");
 0396                    throw new IceRpcException(IceRpcError.InvocationRefused, _invocationRefusedMessage, exception);
 397                }
 398
 1424399                using CancellationTokenRegistration tokenRegistration = _goAwayCts.Token.UnsafeRegister(
 1424400                    OnGoAway,
 1424401                    invocationCts);
 402
 403                PipeWriter payloadWriter;
 404
 405                try
 1424406                {
 407                    lock (_mutex)
 1424408                    {
 1424409                        if (_refuseInvocations)
 0410                        {
 411                            // Both stream.Output and stream.Output are completed by catch blocks below.
 0412                            throw new IceRpcException(IceRpcError.InvocationRefused, _invocationRefusedMessage);
 413                        }
 414
 1424415                        IncrementStreamInputOutputCount(stream.IsBidirectional);
 416
 417                        // Decorate the stream to decrement the input/output count on Complete.
 1424418                        stream = new MultiplexedStreamDecorator(stream, DecrementStreamInputOutputCount);
 1424419                        streamInput = stream.IsBidirectional ? stream.Input : null;
 1424420                    }
 421
 1424422                    EncodeHeader(stream.Output);
 1423423                    payloadWriter = request.GetPayloadWriter(stream.Output);
 1423424                }
 1425                catch
 1426                {
 1427                    stream.Output.CompleteOutput(success: false);
 1428                    throw;
 429                }
 430
 431                // From now on, we only use payloadWriter to write and we make sure to complete it.
 432
 1423433                bool hasContinuation = request.PayloadContinuation is not null;
 434                FlushResult flushResult;
 435
 436                try
 1423437                {
 1423438                    flushResult = await payloadWriter.CopyFromAsync(
 1423439                        request.Payload,
 1423440                        stream.WritesClosed,
 1423441                        endStream: !hasContinuation,
 1423442                        invocationCts.Token).ConfigureAwait(false);
 1415443                }
 8444                catch
 8445                {
 8446                    payloadWriter.CompleteOutput(success: false);
 8447                    request.PayloadContinuation?.Complete();
 8448                    throw;
 449                }
 450                finally
 1423451                {
 1423452                    request.Payload.Complete();
 1423453                }
 454
 1415455                if (flushResult.IsCompleted || flushResult.IsCanceled || !hasContinuation)
 1403456                {
 457                    // The remote reader doesn't want more data, or the copying was canceled, or there is no
 458                    // continuation: we're done.
 1403459                    payloadWriter.CompleteOutput(!flushResult.IsCanceled);
 1403460                    request.PayloadContinuation?.Complete();
 1403461                }
 462                else
 12463                {
 464                    // Sends the payload continuation in a background thread.
 12465                    SendRequestPayloadContinuation(
 12466                        request,
 12467                        payloadWriter,
 12468                        stream.WritesClosed,
 12469                        OnGoAway,
 12470                        invocationCts.Token);
 12471                }
 472
 1415473                if (request.IsOneway)
 1014474                {
 1014475                    return new IncomingResponse(request, _connectionContext!);
 476                }
 477
 401478                Debug.Assert(streamInput is not null);
 479
 480                try
 401481                {
 401482                    ReadResult readResult = await streamInput.ReadSegmentAsync(
 401483                        SliceEncoding.Slice2,
 401484                        _maxLocalHeaderSize,
 401485                        invocationCts.Token).ConfigureAwait(false);
 486
 487                    // Nothing cancels the stream input pipe reader.
 380488                    Debug.Assert(!readResult.IsCanceled);
 489
 380490                    if (readResult.Buffer.IsEmpty)
 0491                    {
 0492                        throw new IceRpcException(
 0493                            IceRpcError.IceRpcError,
 0494                            "Received an icerpc response with an empty header.");
 495                    }
 496
 380497                    (StatusCode statusCode, string? errorMessage, IDictionary<ResponseFieldKey, ReadOnlySequence<byte>> 
 380498                        DecodeHeader(readResult.Buffer);
 380499                    stream.Input.AdvanceTo(readResult.Buffer.End);
 500
 380501                    if (statusCode == StatusCode.TruncatedPayload && invocationCts.Token.IsCancellationRequested)
 0502                    {
 503                        // Canceling the sending of the payload continuation triggers the completion of the stream
 504                        // output. This may lead to a TruncatedPayload if the dispatch is currently reading the payload
 505                        // continuation. In such cases, we prioritize throwing an OperationCanceledException.
 0506                        fieldsPipeReader?.Complete();
 0507                        invocationCts.Token.ThrowIfCancellationRequested();
 0508                    }
 509
 380510                    var response = new IncomingResponse(
 380511                        request,
 380512                        _connectionContext!,
 380513                        statusCode,
 380514                        errorMessage,
 380515                        fields,
 380516                        fieldsPipeReader)
 380517                    {
 380518                        Payload = streamInput
 380519                    };
 520
 380521                    streamInput = null; // response now owns the stream input
 380522                    return response;
 523                }
 1524                catch (InvalidDataException exception)
 1525                {
 1526                    throw new IceRpcException(
 1527                        IceRpcError.IceRpcError,
 1528                        "Received an icerpc response with an invalid header.",
 1529                        exception);
 530                }
 531
 532                void OnGoAway(object? cts)
 14533                {
 14534                    if (!stream.IsStarted ||
 14535                        stream.Id >=
 14536                            (stream.IsBidirectional ?
 14537                                _goAwayFrame.BidirectionalStreamId :
 14538                                _goAwayFrame.UnidirectionalStreamId))
 4539                    {
 540                        // The request wasn't received by the peer so it's safe to cancel the invocation.
 4541                        ((CancellationTokenSource)cts!).Cancel();
 4542                    }
 14543                }
 544            }
 16545            catch (OperationCanceledException exception) when (exception.CancellationToken == invocationCts.Token)
 13546            {
 13547                cancellationToken.ThrowIfCancellationRequested();
 548
 6549                if (_disposedCts.IsCancellationRequested)
 3550                {
 551                    // DisposeAsync aborted the request.
 3552                    throw new IceRpcException(IceRpcError.OperationAborted);
 553                }
 554                else
 3555                {
 3556                    Debug.Assert(_goAwayCts.IsCancellationRequested);
 3557                    throw new IceRpcException(IceRpcError.InvocationCanceled, "The connection is shutting down.");
 558                }
 559            }
 560            finally
 1433561            {
 1433562                streamInput?.Complete();
 1433563                DecrementDispatchInvocationCount();
 1433564            }
 565
 566            static (StatusCode StatusCode, string? ErrorMessage, IDictionary<ResponseFieldKey, ReadOnlySequence<byte>>, 
 567                ReadOnlySequence<byte> buffer)
 380568            {
 380569                var decoder = new SliceDecoder(buffer, SliceEncoding.Slice2);
 570
 380571                StatusCode statusCode = decoder.DecodeStatusCode();
 380572                string? errorMessage = statusCode == StatusCode.Ok ? null : decoder.DecodeString();
 573
 380574                (IDictionary<ResponseFieldKey, ReadOnlySequence<byte>> fields, PipeReader? pipeReader) =
 380575                    DecodeFieldDictionary(
 380576                        ref decoder,
 383577                        (ref SliceDecoder decoder) => decoder.DecodeResponseFieldKey());
 578
 380579                return (statusCode, errorMessage, fields, pipeReader);
 380580            }
 581
 582            void EncodeHeader(PipeWriter streamOutput)
 1424583            {
 1424584                var encoder = new SliceEncoder(streamOutput, SliceEncoding.Slice2);
 585
 586                // Write the IceRpc request header.
 1424587                Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(_headerSizeLength);
 588
 589                // We use UnflushedBytes because EncodeFieldDictionary can write directly to streamOutput.
 1424590                long headerStartPos = streamOutput.UnflushedBytes;
 591
 1424592                var header = new IceRpcRequestHeader(request.ServiceAddress.Path, request.Operation);
 593
 1424594                header.Encode(ref encoder);
 595
 1424596                EncodeFieldDictionary(
 1424597                    request.Fields,
 14598                    (ref SliceEncoder encoder, RequestFieldKey key) => encoder.EncodeRequestFieldKey(key),
 1424599                    ref encoder,
 1424600                    streamOutput);
 601
 602                // We're done with the header encoding, write the header size.
 1424603                int headerSize = (int)(streamOutput.UnflushedBytes - headerStartPos);
 1424604                CheckPeerHeaderSize(headerSize);
 1423605                SliceEncoder.EncodeVarUInt62((uint)headerSize, sizePlaceholder);
 1423606            }
 1394607        }
 1433608    }
 609
 610    public Task ShutdownAsync(CancellationToken cancellationToken = default)
 101611    {
 612        lock (_mutex)
 101613        {
 101614            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 615
 99616            if (_shutdownTask is not null)
 2617            {
 2618                throw new InvalidOperationException("Cannot call ShutdownAsync more than once.");
 619            }
 97620            if (_connectTask is null || !_connectTask.IsCompletedSuccessfully)
 3621            {
 3622                throw new InvalidOperationException("Cannot shut down a protocol connection before it's connected.");
 623            }
 624
 94625            RefuseNewInvocations("The connection was shut down.");
 626
 94627            if (_streamInputOutputCount == 0)
 71628            {
 71629                _streamsCompleted.TrySetResult();
 71630            }
 94631            if (_dispatchInvocationCount == 0)
 69632            {
 69633                _dispatchesAndInvocationsCompleted.TrySetResult();
 69634            }
 635
 94636            _shutdownTask = PerformShutdownAsync();
 94637        }
 94638        return _shutdownTask;
 639
 640        async Task PerformShutdownAsync()
 94641        {
 94642            await Task.Yield(); // exit mutex lock
 643
 94644            _shutdownOrGoAwayCts.Cancel();
 645
 646            try
 94647            {
 94648                Debug.Assert(_acceptRequestsTask is not null);
 94649                Debug.Assert(_controlStream is not null);
 94650                Debug.Assert(_readGoAwayTask is not null);
 94651                Debug.Assert(_remoteControlStream is not null);
 652
 94653                await _acceptRequestsTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 654
 80655                using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _disposedCts.Token);
 656
 657                // Once shutdownTask is not null, _lastRemoteBidirectionalStreamId and _lastRemoteUnidirectionalStreamId
 658                // are immutable.
 659
 660                // When this peer is the server endpoint, the first accepted bidirectional stream ID is 0. When this
 661                // peer is the client endpoint, the first accepted bidirectional stream ID is 1.
 80662                IceRpcGoAway goAwayFrame = new(
 80663                    _lastRemoteBidirectionalStreamId is ulong value ? value + 4 : (IsServer ? 0ul : 1ul),
 80664                    (_lastRemoteUnidirectionalStreamId ?? _remoteControlStream.Id) + 4);
 665
 666                try
 80667                {
 80668                    _ = await SendControlFrameAsync(
 80669                        IceRpcControlFrameType.GoAway,
 80670                        goAwayFrame.Encode,
 80671                        cts.Token).ConfigureAwait(false);
 672
 673                    // Wait for the peer to send back a GoAway frame. The task should already be completed if the
 674                    // shutdown was initiated by the peer.
 79675                    await _readGoAwayTask.WaitAsync(cts.Token).ConfigureAwait(false);
 676
 677                    // Wait for all streams (other than the control streams) to have their Input and Output completed.
 69678                    await _streamsCompleted.Task.WaitAsync(cts.Token).ConfigureAwait(false);
 679
 680                    // Close the control stream to notify the peer that on our side, all the streams completed and that
 681                    // it can close the transport connection whenever it likes.
 68682                    _controlStream.Output.CompleteOutput(success: true);
 68683                }
 12684                catch
 12685                {
 686                    // If we fail to send the GoAway frame or some other failure occur (such as
 687                    // OperationCanceledException) we are in an abortive closure and we close Output to allow
 688                    // the peer to continue if it's waiting for us.
 12689                    _controlStream.Output.CompleteOutput(success: false);
 12690                    throw;
 691                }
 692
 693                // Wait for the peer notification that on its side all the streams are completed. It's important to wait
 694                // for this notification before closing the connection. In particular with QUIC where closing the
 695                // connection before all the streams are processed could lead to a stream failure.
 696                try
 68697                {
 698                    // Wait for the _remoteControlStream Input completion.
 68699                    ReadResult readResult = await _remoteControlStream.Input.ReadAsync(cts.Token).ConfigureAwait(false);
 700
 66701                    Debug.Assert(!readResult.IsCanceled);
 702
 66703                    if (!readResult.IsCompleted || !readResult.Buffer.IsEmpty)
 0704                    {
 0705                        throw new IceRpcException(
 0706                            IceRpcError.IceRpcError,
 0707                            "Received bytes on the control stream after receiving the GoAway frame.");
 708                    }
 709
 710                    // We can now safely close the connection.
 66711                    await _transportConnection.CloseAsync(MultiplexedConnectionCloseError.NoError, cts.Token)
 66712                        .ConfigureAwait(false);
 66713                }
 2714                catch (IceRpcException exception) when (exception.IceRpcError == IceRpcError.ConnectionClosedByPeer)
 0715                {
 716                    // Expected if the peer closed the connection first.
 0717                }
 718
 719                // We wait for the completion of the dispatches that we created (and, secondarily, invocations).
 66720                await _dispatchesAndInvocationsCompleted.Task.WaitAsync(cts.Token).ConfigureAwait(false);
 66721            }
 7722            catch (OperationCanceledException)
 7723            {
 7724                cancellationToken.ThrowIfCancellationRequested();
 725
 2726                Debug.Assert(_disposedCts.Token.IsCancellationRequested);
 2727                throw new IceRpcException(
 2728                    IceRpcError.OperationAborted,
 2729                    "The connection shutdown was aborted because the connection was disposed.");
 730            }
 0731            catch (InvalidDataException exception)
 0732            {
 0733                throw new IceRpcException(
 0734                    IceRpcError.IceRpcError,
 0735                    "The connection shutdown was aborted by an icerpc protocol error.",
 0736                    exception);
 737            }
 21738            catch (IceRpcException)
 21739            {
 21740                throw;
 741            }
 0742            catch (Exception exception)
 0743            {
 0744                Debug.Fail($"ShutdownAsync failed with an unexpected exception: {exception}");
 0745                throw;
 746            }
 66747        }
 94748    }
 749
 371750    internal IceRpcProtocolConnection(
 371751        IMultiplexedConnection transportConnection,
 371752        TransportConnectionInformation? transportConnectionInformation,
 371753        ConnectionOptions options,
 371754        ITaskExceptionObserver? taskExceptionObserver)
 371755    {
 371756        _shutdownOrGoAwayCts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token, _goAwayCts.Token);
 757
 371758        _taskExceptionObserver = taskExceptionObserver;
 759
 371760        _transportConnection = transportConnection;
 371761        _dispatcher = options.Dispatcher;
 371762        _maxLocalHeaderSize = options.MaxIceRpcHeaderSize;
 371763        _transportConnectionInformation = transportConnectionInformation;
 764
 371765        if (options.MaxDispatches > 0)
 371766        {
 371767            _dispatchSemaphore = new SemaphoreSlim(
 371768                initialCount: options.MaxDispatches,
 371769                maxCount: options.MaxDispatches);
 371770        }
 771
 371772        _inactivityTimeout = options.InactivityTimeout;
 371773        _inactivityTimeoutTimer = new Timer(_ =>
 5774        {
 5775            bool requestShutdown = false;
 371776
 371777            lock (_mutex)
 5778            {
 5779                if (_shutdownTask is null && _dispatchInvocationCount == 0 && _streamInputOutputCount == 0)
 5780                {
 5781                    requestShutdown = true;
 5782                    RefuseNewInvocations(
 5783                        $"The connection was shut down because it was inactive for over {_inactivityTimeout.TotalSeconds
 5784                }
 5785            }
 371786
 5787            if (requestShutdown)
 5788            {
 371789                // TrySetResult must be called outside the mutex lock
 5790                _shutdownRequestedTcs.TrySetResult();
 5791            }
 376792        });
 371793    }
 794
 795    private static (IDictionary<TKey, ReadOnlySequence<byte>>, PipeReader?) DecodeFieldDictionary<TKey>(
 796        ref SliceDecoder decoder,
 797        DecodeFunc<TKey> decodeKeyFunc) where TKey : struct
 1793798    {
 1793799        int count = decoder.DecodeSize();
 800
 801        IDictionary<TKey, ReadOnlySequence<byte>> fields;
 802        PipeReader? pipeReader;
 1793803        if (count == 0)
 1776804        {
 1776805            fields = ImmutableDictionary<TKey, ReadOnlySequence<byte>>.Empty;
 1776806            pipeReader = null;
 1776807            decoder.CheckEndOfBuffer();
 1776808        }
 809        else
 17810        {
 811            // We don't use the normal collection allocation check here because SizeOf<ReadOnlySequence<byte>> is quite
 812            // large (24).
 813            // For example, say we decode a fields dictionary with a single field with an empty value. It's encoded
 814            // using 1 byte (dictionary size) + 1 byte (key) + 1 byte (value size) = 3 bytes. The decoder's default max
 815            // allocation size is 3 * 8 = 24. If we simply call IncreaseCollectionAllocation(1 * (4 + 24)), we'll exceed
 816            // the default collection allocation limit. (sizeof TKey is currently 4 but could/should increase to 8).
 817
 818            // Each field consumes at least 2 bytes: 1 for the key and one for the value size.
 17819            if (count * 2 > decoder.Remaining)
 0820            {
 0821                throw new InvalidDataException("Too many fields.");
 822            }
 823
 17824            fields = new Dictionary<TKey, ReadOnlySequence<byte>>(count);
 17825            var pipe = new Pipe();
 17826            decoder.CopyTo(pipe.Writer);
 17827            pipe.Writer.Complete();
 828
 829            try
 17830            {
 17831                _ = pipe.Reader.TryRead(out ReadResult readResult);
 17832                var fieldsDecoder = new SliceDecoder(readResult.Buffer, SliceEncoding.Slice2);
 833
 68834                for (int i = 0; i < count; ++i)
 17835                {
 836                    // Decode the field key.
 17837                    TKey key = decodeKeyFunc(ref fieldsDecoder);
 838
 839                    // Decode and check the field value size.
 840                    int valueSize;
 841                    try
 17842                    {
 17843                        valueSize = checked((int)fieldsDecoder.DecodeVarUInt62());
 17844                    }
 0845                    catch (OverflowException exception)
 0846                    {
 0847                        throw new InvalidDataException("The field size can't be larger than int.MaxValue.", exception);
 848                    }
 849
 17850                    if (valueSize > fieldsDecoder.Remaining)
 0851                    {
 0852                        throw new InvalidDataException(
 0853                            $"The value of field '{key}' extends beyond the end of the buffer.");
 854                    }
 855
 856                    // Create a ROS reference to the field value by slicing the fields pipe reader ROS.
 17857                    ReadOnlySequence<byte> value = readResult.Buffer.Slice(fieldsDecoder.Consumed, valueSize);
 17858                    fields.Add(key, value);
 859
 860                    // Skip the field value to prepare the decoder to read the next field value.
 17861                    fieldsDecoder.Skip(valueSize);
 17862                }
 17863                fieldsDecoder.CheckEndOfBuffer();
 864
 17865                pipe.Reader.AdvanceTo(readResult.Buffer.Start); // complete read without consuming anything
 866
 17867                pipeReader = pipe.Reader;
 17868            }
 0869            catch
 0870            {
 0871                pipe.Reader.Complete();
 0872                throw;
 873            }
 17874        }
 875
 876        // The caller is responsible for completing the pipe reader.
 1793877        return (fields, pipeReader);
 1793878    }
 879
 880    private async Task AcceptRequestsAsync(CancellationToken cancellationToken)
 298881    {
 298882        await Task.Yield(); // exit mutex lock
 883
 884        // Wait for _connectTask (which spawned the task running this method) to complete. This way, we won't dispatch
 885        // any request until _connectTask has completed successfully, and indirectly we won't make any invocation until
 886        // _connectTask has completed successfully. The creation of the _acceptRequestsTask is the last action taken by
 887        // _connectTask and as a result this await can't fail.
 298888        await _connectTask!.ConfigureAwait(false);
 889
 890        try
 298891        {
 892            // We check the cancellation token for each iteration because we want to exit the accept requests loop as
 893            // soon as ShutdownAsync/GoAway requests this cancellation, even when more streams can be accepted without
 894            // waiting.
 1718895            while (!cancellationToken.IsCancellationRequested)
 1718896            {
 897                // When _dispatcher is null, the multiplexed connection MaxUnidirectionalStreams and
 898                // MaxBidirectionalStreams options are configured to not accept any request-stream from the peer. As a
 899                // result, when _dispatcher is null, this call will block indefinitely until the cancellation token is
 900                // canceled by ShutdownAsync, GoAway or DisposeAsync.
 1718901                IMultiplexedStream stream = await _transportConnection.AcceptStreamAsync(cancellationToken)
 1718902                    .ConfigureAwait(false);
 903
 904                lock (_mutex)
 1420905                {
 906                    // We don't want to increment _dispatchInvocationCount/_streamInputOutputCount when the connection
 907                    // is shutting down or being disposed.
 1420908                    if (_shutdownTask is not null)
 0909                    {
 910                        // Note that cancellationToken may not be canceled yet at this point.
 0911                        throw new OperationCanceledException();
 912                    }
 913
 914                    // The logic in IncrementStreamInputOutputCount requires that we increment the dispatch-invocation
 915                    // count first.
 1420916                    IncrementDispatchInvocationCount();
 1420917                    IncrementStreamInputOutputCount(stream.IsBidirectional);
 918
 919                    // Decorate the stream to decrement the stream input/output count on Complete.
 1420920                    stream = new MultiplexedStreamDecorator(stream, DecrementStreamInputOutputCount);
 921
 922                    // The multiplexed connection guarantees that the IDs of accepted streams of a given type have ever
 923                    // increasing values.
 924
 1420925                    if (stream.IsBidirectional)
 407926                    {
 407927                        _lastRemoteBidirectionalStreamId = stream.Id;
 407928                    }
 929                    else
 1013930                    {
 1013931                        _lastRemoteUnidirectionalStreamId = stream.Id;
 1013932                    }
 1420933                }
 934
 935                // Start a task to read the stream and dispatch the request. We pass CancellationToken.None to Task.Run
 936                // because DispatchRequestAsync must clean-up the stream and the dispatch-invocation count.
 2840937                _ = Task.Run(() => DispatchRequestAsync(stream), CancellationToken.None);
 1420938            }
 0939        }
 228940        catch (OperationCanceledException)
 228941        {
 942            // Expected, the associated cancellation token source was canceled.
 228943        }
 70944        catch (IceRpcException)
 70945        {
 70946            RefuseNewInvocations("The connection was lost");
 70947            _ = _shutdownRequestedTcs.TrySetResult();
 70948            throw;
 949        }
 0950        catch (Exception exception)
 0951        {
 0952            Debug.Fail($"The accept stream task failed with an unexpected exception: {exception}");
 0953            RefuseNewInvocations("The connection was lost");
 0954            _ = _shutdownRequestedTcs.TrySetResult();
 0955            throw;
 956        }
 228957    }
 958
 959    private void CheckPeerHeaderSize(int headerSize)
 1815960    {
 1815961        if (headerSize > _maxPeerHeaderSize)
 2962        {
 2963            throw new IceRpcException(
 2964                IceRpcError.LimitExceeded,
 2965                $"The header size ({headerSize}) for an icerpc request or response is greater than the peer's max header
 966        }
 1813967    }
 968
 969    private void DecrementDispatchInvocationCount()
 2865970    {
 971        lock (_mutex)
 2865972        {
 2865973            if (--_dispatchInvocationCount == 0)
 569974            {
 569975                if (_shutdownTask is not null)
 30976                {
 30977                    _dispatchesAndInvocationsCompleted.TrySetResult();
 30978                }
 539979                else if (!_refuseInvocations && _streamInputOutputCount == 0)
 430980                {
 430981                    ScheduleInactivityCheck();
 430982                }
 569983            }
 2865984        }
 2865985    }
 986
 987    /// <summary>Decrements the stream input/output count.</summary>
 988    private void DecrementStreamInputOutputCount()
 3656989    {
 990        lock (_mutex)
 3656991        {
 3656992            if (--_streamInputOutputCount == 0)
 558993            {
 558994                if (_shutdownTask is not null)
 28995                {
 28996                    _streamsCompleted.TrySetResult();
 28997                }
 530998                else if (!_refuseInvocations && _dispatchInvocationCount == 0)
 98999                {
 1000                    // We enable the inactivity check in order to complete _shutdownRequestedTcs when inactive for too
 1001                    // long. _refuseInvocations is true when the connection is either about to be "shutdown requested",
 1002                    // or shut down / disposed. We don't need to complete _shutdownRequestedTcs in any of these
 1003                    // situations.
 981004                    ScheduleInactivityCheck();
 981005                }
 5581006            }
 36561007        }
 36561008    }
 1009
 1010    private async Task DispatchRequestAsync(IMultiplexedStream stream)
 14201011    {
 1012        // _disposedCts is not disposed since we own a dispatch count.
 14201013        CancellationToken cancellationToken = stream.IsBidirectional ?
 14201014            stream.WritesClosed.AsCancellationToken(_disposedCts.Token) :
 14201015            _disposedCts.Token;
 1016
 14201017        PipeReader? fieldsPipeReader = null;
 1018        IDictionary<RequestFieldKey, ReadOnlySequence<byte>> fields;
 1019        IceRpcRequestHeader header;
 1020
 14201021        PipeReader? streamInput = stream.Input;
 14201022        PipeWriter? streamOutput = stream.IsBidirectional ? stream.Output : null;
 14201023        bool success = false;
 1024
 1025        try
 14201026        {
 1027            try
 14201028            {
 14201029                ReadResult readResult = await streamInput.ReadSegmentAsync(
 14201030                    SliceEncoding.Slice2,
 14201031                    _maxLocalHeaderSize,
 14201032                    cancellationToken).ConfigureAwait(false);
 1033
 14141034                if (readResult.Buffer.IsEmpty)
 11035                {
 11036                    throw new IceRpcException(IceRpcError.IceRpcError, "Received icerpc request with empty header.");
 1037                }
 1038
 14131039                (header, fields, fieldsPipeReader) = DecodeHeader(readResult.Buffer);
 14131040                streamInput.AdvanceTo(readResult.Buffer.End);
 14131041            }
 31042            catch (InvalidDataException exception)
 31043            {
 31044                var rpcException = new IceRpcException(
 31045                    IceRpcError.IceRpcError,
 31046                    "Received invalid icerpc request header.",
 31047                    exception);
 1048
 31049                if (_taskExceptionObserver is null)
 11050                {
 11051                    throw rpcException;
 1052                }
 1053                else
 21054                {
 21055                    _taskExceptionObserver.DispatchRefused(
 21056                        _connectionContext!.TransportConnectionInformation,
 21057                        rpcException);
 21058                    return; // success remains false
 1059                }
 1060            }
 41061            catch (Exception exception) when (_taskExceptionObserver is not null)
 21062            {
 21063                _taskExceptionObserver.DispatchRefused(_connectionContext!.TransportConnectionInformation, exception);
 21064                return; // success remains false
 1065            }
 1066
 14131067            using var request = new IncomingRequest(Protocol.IceRpc, _connectionContext!)
 14131068            {
 14131069                Fields = fields,
 14131070                IsOneway = !stream.IsBidirectional,
 14131071                Operation = header.Operation,
 14131072                Path = header.Path,
 14131073                Payload = streamInput
 14131074            };
 1075
 14131076            streamInput = null; // the request now owns streamInput
 1077
 1078            try
 14131079            {
 14131080                OutgoingResponse response = await PerformDispatchRequestAsync(request, cancellationToken)
 14131081                    .ConfigureAwait(false);
 1082
 14051083                if (!request.IsOneway)
 3921084                {
 3921085                    Debug.Assert(streamOutput is not null);
 3921086                    EncodeHeader(response);
 1087
 3901088                    PipeWriter payloadWriter = response.GetPayloadWriter(streamOutput);
 1089
 1090                    // We give flushResult an initial "failed" value, in case the first CopyFromAsync throws.
 3901091                    var flushResult = new FlushResult(isCanceled: true, isCompleted: false);
 1092
 1093                    try
 3901094                    {
 1095                        // We don't use cancellationToken here because it's canceled shortly afterwards by the
 1096                        // completion of writesClosed. This works around https://github.com/dotnet/runtime/issues/82704
 1097                        // where the stream would otherwise be aborted after the successful write. It's also fine to
 1098                        // just use _disposedCts.Token: if writes are closed because the peer is not longer interested
 1099                        // in the response, the write operations will raise an IceRpcException(StreamAborted) which is
 1100                        // ignored.
 3901101                        bool hasContinuation = response.PayloadContinuation is not null;
 1102
 3901103                        flushResult = await payloadWriter.CopyFromAsync(
 3901104                            response.Payload,
 3901105                            stream.WritesClosed,
 3901106                            endStream: !hasContinuation,
 3901107                            _disposedCts.Token).ConfigureAwait(false);
 1108
 3861109                        if (!flushResult.IsCompleted && !flushResult.IsCanceled && hasContinuation)
 21110                        {
 21111                            flushResult = await payloadWriter.CopyFromAsync(
 21112                                response.PayloadContinuation!,
 21113                                stream.WritesClosed,
 21114                                endStream: true,
 21115                                _disposedCts.Token).ConfigureAwait(false);
 11116                        }
 3851117                    }
 1118                    finally
 3901119                    {
 3901120                        payloadWriter.CompleteOutput(success: !flushResult.IsCanceled);
 3901121                        response.Payload.Complete();
 3901122                        response.PayloadContinuation?.Complete();
 3901123                    }
 3851124                }
 13981125            }
 151126            catch (Exception exception) when (_taskExceptionObserver is not null)
 71127            {
 71128                _taskExceptionObserver.DispatchFailed(
 71129                    request,
 71130                    _connectionContext!.TransportConnectionInformation,
 71131                    exception);
 71132                return; // success remains false
 1133            }
 13981134            success = true;
 13981135        }
 11136        catch (IceRpcException)
 11137        {
 1138            // Expected, with for example:
 1139            //  - IceRpcError.ConnectionAborted when the peer aborts the connection
 1140            //  - IceRpcError.IceRpcError when the request header is invalid
 1141            //  - IceRpcError.TruncatedData when the request header is truncated
 11142        }
 101143        catch (OperationCanceledException exception) when (
 101144            exception.CancellationToken == cancellationToken ||
 101145            exception.CancellationToken == _disposedCts.Token)
 101146        {
 1147            // Expected if the dispatch is canceled by the peer or the connection is disposed.
 101148        }
 01149        catch (Exception exception)
 01150        {
 1151            // This exception is unexpected when running the IceRPC test suite. A test that expects this exception must
 1152            // install a task exception observer.
 01153            Debug.Fail($"icerpc dispatch failed with an unexpected exception: {exception}");
 1154
 1155            // Generate unobserved task exception (UTE). If this exception is expected (e.g. an expected payload read
 1156            // exception) and the application wants to avoid this UTE, it must configure a non-null logger to install
 1157            // a task exception observer.
 01158            throw;
 1159        }
 1160        finally
 14201161        {
 14201162            if (!success)
 221163            {
 1164                // We always need to complete streamOutput when an exception is thrown. For example, we received an
 1165                // invalid request header that we could not decode.
 221166                streamOutput?.CompleteOutput(success: false);
 221167                streamInput?.Complete();
 221168            }
 14201169            fieldsPipeReader?.Complete();
 1170
 14201171            DecrementDispatchInvocationCount();
 14201172        }
 1173
 1174        async Task<OutgoingResponse> PerformDispatchRequestAsync(
 1175            IncomingRequest request,
 1176            CancellationToken cancellationToken)
 14131177        {
 14131178            Debug.Assert(_dispatcher is not null);
 1179
 1180            OutgoingResponse response;
 1181
 1182            try
 14131183            {
 14131184                if (_dispatchSemaphore is SemaphoreSlim dispatchSemaphore)
 14131185                {
 14131186                    await dispatchSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
 14131187                }
 1188
 1189                try
 14131190                {
 14131191                    response = await _dispatcher.DispatchAsync(request, cancellationToken).ConfigureAwait(false);
 13941192                }
 1193                finally
 14131194                {
 14131195                    _dispatchSemaphore?.Release();
 14131196                }
 1197
 13941198                if (response != request.Response)
 11199                {
 11200                    throw new InvalidOperationException(
 11201                        "The dispatcher did not return the last response created for this request.");
 1202                }
 13931203            }
 91204            catch (OperationCanceledException exception) when (cancellationToken == exception.CancellationToken)
 81205            {
 81206                throw;
 1207            }
 121208            catch (Exception exception)
 121209            {
 121210                if (exception is not DispatchException dispatchException)
 81211                {
 81212                    StatusCode statusCode = exception switch
 81213                    {
 11214                        InvalidDataException => StatusCode.InvalidData,
 41215                        IceRpcException iceRpcException when iceRpcException.IceRpcError == IceRpcError.TruncatedData =>
 41216                            StatusCode.TruncatedPayload,
 31217                        _ => StatusCode.InternalError
 81218                    };
 81219                    dispatchException = new DispatchException(statusCode, message: null, exception);
 81220                }
 121221                response = dispatchException.ToOutgoingResponse(request);
 121222            }
 1223
 14051224            return response;
 14051225        }
 1226
 1227        static (IceRpcRequestHeader, IDictionary<RequestFieldKey, ReadOnlySequence<byte>>, PipeReader?) DecodeHeader(
 1228            ReadOnlySequence<byte> buffer)
 14131229        {
 14131230            var decoder = new SliceDecoder(buffer, SliceEncoding.Slice2);
 14131231            var header = new IceRpcRequestHeader(ref decoder);
 14131232            (IDictionary<RequestFieldKey, ReadOnlySequence<byte>> fields, PipeReader? pipeReader) =
 14131233                DecodeFieldDictionary(
 14131234                    ref decoder,
 14271235                    (ref SliceDecoder decoder) => decoder.DecodeRequestFieldKey());
 1236
 14131237            return (header, fields, pipeReader);
 14131238        }
 1239
 1240        void EncodeHeader(OutgoingResponse response)
 3921241        {
 3921242            var encoder = new SliceEncoder(streamOutput, SliceEncoding.Slice2);
 1243
 1244            // Write the IceRpc response header.
 3921245            Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(_headerSizeLength);
 1246
 1247            // We use UnflushedBytes because EncodeFieldDictionary can write directly to streamOutput.
 3921248            long headerStartPos = streamOutput.UnflushedBytes;
 1249
 3921250            encoder.EncodeStatusCode(response.StatusCode);
 3921251            if (response.StatusCode > StatusCode.Ok)
 311252            {
 311253                encoder.EncodeString(response.ErrorMessage!);
 311254            }
 1255
 3921256            EncodeFieldDictionary(
 3921257                response.Fields,
 51258                (ref SliceEncoder encoder, ResponseFieldKey key) => encoder.EncodeResponseFieldKey(key),
 3921259                ref encoder,
 3921260                streamOutput);
 1261
 1262            // We're done with the header encoding, write the header size.
 3911263            int headerSize = (int)(streamOutput.UnflushedBytes - headerStartPos);
 3911264            CheckPeerHeaderSize(headerSize);
 3901265            SliceEncoder.EncodeVarUInt62((uint)headerSize, sizePlaceholder);
 3901266        }
 14201267    }
 1268
 1269    /// <summary>Encodes the fields dictionary at the end of a request or response header.</summary>
 1270    /// <remarks>This method can write bytes directly to <paramref name="output"/> without going through
 1271    /// <paramref name="encoder"/>.</remarks>
 1272    private void EncodeFieldDictionary<TKey>(
 1273        IDictionary<TKey, OutgoingFieldValue> fields,
 1274        EncodeAction<TKey> encodeKeyAction,
 1275        ref SliceEncoder encoder,
 1276        PipeWriter output) where TKey : struct =>
 18161277        encoder.EncodeDictionary(
 18161278            fields,
 18161279            encodeKeyAction,
 18161280            (ref SliceEncoder encoder, OutgoingFieldValue value) =>
 191281                {
 191282                    if (value.WriteAction is Action<IBufferWriter<byte>> writeAction)
 91283                    {
 91284                        Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(_headerSizeLength);
 91285                        long startPos = output.UnflushedBytes;
 91286                        writeAction(output);
 81287                        SliceEncoder.EncodeVarUInt62((ulong)(output.UnflushedBytes - startPos), sizePlaceholder);
 81288                    }
 18161289                    else
 101290                    {
 101291                        encoder.EncodeSize(checked((int)value.ByteSequence.Length));
 101292                        encoder.WriteByteSequence(value.ByteSequence);
 101293                    }
 18341294                });
 1295
 1296    /// <summary>Increments the dispatch-invocation count.</summary>
 1297    /// <remarks>This method must be called with _mutex locked.</remarks>
 1298    private void IncrementDispatchInvocationCount()
 28651299    {
 28651300        if (_dispatchInvocationCount++ == 0 && _streamInputOutputCount == 0)
 5691301        {
 1302            // Cancel inactivity check.
 5691303            _inactivityTimeoutTimer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
 5691304        }
 28651305    }
 1306
 1307    /// <summary>Increments the stream input/output count.</summary>
 1308    /// <remarks>This method must be called with _mutex locked.</remarks>
 1309    private void IncrementStreamInputOutputCount(bool bidirectional)
 28441310    {
 28441311        Debug.Assert(_dispatchInvocationCount > 0);
 28441312        _streamInputOutputCount += bidirectional ? 2 : 1;
 28441313    }
 1314
 1315    private async Task ReadGoAwayAsync(CancellationToken cancellationToken)
 2981316    {
 2981317        await Task.Yield(); // exit mutex lock
 1318
 1319        // Wait for _connectTask (which spawned the task running this method) to complete. This await can't fail.
 1320        // This guarantees this method won't request a shutdown until after _connectTask completed successfully.
 2981321        await _connectTask!.ConfigureAwait(false);
 1322
 2981323        PipeReader remoteInput = _remoteControlStream!.Input!;
 1324
 1325        try
 2981326        {
 1327            // Wait to receive the GoAway frame.
 2981328            await ReceiveControlFrameHeaderAsync(IceRpcControlFrameType.GoAway, cancellationToken)
 2981329                .ConfigureAwait(false);
 1330
 791331            ReadResult readResult = await remoteInput.ReadSegmentAsync(
 791332                SliceEncoding.Slice2,
 791333                MaxGoAwayFrameBodySize,
 791334                cancellationToken).ConfigureAwait(false);
 1335
 1336            // We don't call CancelPendingRead on remoteInput
 771337            Debug.Assert(!readResult.IsCanceled);
 1338
 1339            try
 771340            {
 771341                _goAwayFrame = SliceEncoding.Slice2.DecodeBuffer(
 771342                    readResult.Buffer,
 1541343                    (ref SliceDecoder decoder) => new IceRpcGoAway(ref decoder));
 761344            }
 1345            finally
 771346            {
 771347                remoteInput.AdvanceTo(readResult.Buffer.End);
 771348            }
 1349
 761350            RefuseNewInvocations("The connection was shut down because it received a GoAway frame from the peer.");
 761351            _goAwayCts.Cancel();
 761352            _ = _shutdownRequestedTcs.TrySetResult();
 761353        }
 1371354        catch (OperationCanceledException)
 1371355        {
 1356            // The connection is disposed and we let this exception cancel the task.
 1371357            throw;
 1358        }
 801359        catch (IceRpcException)
 801360        {
 1361            // We let the task complete with this expected exception.
 801362            throw;
 1363        }
 51364        catch (InvalidDataException exception)
 51365        {
 1366            // "expected" in the sense it should not trigger a Debug.Fail.
 51367            throw new IceRpcException(
 51368                IceRpcError.IceRpcError,
 51369                "The ReadGoAway task was aborted by an icerpc protocol error.",
 51370                exception);
 1371        }
 01372        catch (Exception exception)
 01373        {
 01374            Debug.Fail($"The read go away task failed with an unexpected exception: {exception}");
 01375            throw;
 1376        }
 761377    }
 1378
 1379    private async ValueTask ReceiveControlFrameHeaderAsync(
 1380        IceRpcControlFrameType expectedFrameType,
 1381        CancellationToken cancellationToken)
 6081382    {
 6081383        ReadResult readResult = await _remoteControlStream!.Input.ReadAsync(cancellationToken).ConfigureAwait(false);
 1384
 1385        // We don't call CancelPendingRead on _remoteControlStream.Input.
 3871386        Debug.Assert(!readResult.IsCanceled);
 1387
 3871388        if (readResult.Buffer.IsEmpty)
 11389        {
 11390            throw new InvalidDataException(
 11391                "Failed to read the frame type because no more data is available from the control stream.");
 1392        }
 1393
 3861394        var frameType = (IceRpcControlFrameType)readResult.Buffer.FirstSpan[0];
 3861395        if (frameType != expectedFrameType)
 31396        {
 31397            throw new InvalidDataException($"Received frame type {frameType} but expected {expectedFrameType}.");
 1398        }
 3831399        _remoteControlStream!.Input.AdvanceTo(readResult.Buffer.GetPosition(1));
 3831400    }
 1401
 1402    private async ValueTask ReceiveSettingsFrameBody(CancellationToken cancellationToken)
 3041403    {
 1404        // We are still in the single-threaded initialization at this point.
 1405
 3041406        PipeReader input = _remoteControlStream!.Input;
 3041407        ReadResult readResult = await input.ReadSegmentAsync(
 3041408            SliceEncoding.Slice2,
 3041409            MaxSettingsFrameBodySize,
 3041410            cancellationToken).ConfigureAwait(false);
 1411
 1412        // We don't call CancelPendingRead on _remoteControlStream.Input
 3031413        Debug.Assert(!readResult.IsCanceled);
 1414
 1415        try
 3031416        {
 3031417            IceRpcSettings settings = SliceEncoding.Slice2.DecodeBuffer(
 3031418                readResult.Buffer,
 6061419                (ref SliceDecoder decoder) => new IceRpcSettings(ref decoder));
 1420
 3001421            if (settings.Value.TryGetValue(IceRpcSettingKey.MaxHeaderSize, out ulong value))
 31422            {
 1423                // a varuint62 always fits in a long
 1424                try
 31425                {
 31426                    _maxPeerHeaderSize = ConnectionOptions.IceRpcCheckMaxHeaderSize((long)value);
 21427                }
 11428                catch (ArgumentOutOfRangeException exception)
 11429                {
 11430                    throw new InvalidDataException("Received invalid maximum header size setting.", exception);
 1431                }
 21432                _headerSizeLength = SliceEncoder.GetVarUInt62EncodedSize(value);
 21433            }
 1434            // all other settings are unknown and ignored
 2991435        }
 1436        finally
 3031437        {
 3031438            input.AdvanceTo(readResult.Buffer.End);
 3031439        }
 2991440    }
 1441
 1442    private void RefuseNewInvocations(string message)
 6171443    {
 1444        lock (_mutex)
 6171445        {
 6171446            _refuseInvocations = true;
 6171447            _invocationRefusedMessage ??= message;
 6171448        }
 6171449    }
 1450
 1451    // The inactivity check executes once in _inactivityTimeout. By then either:
 1452    // - the connection is no longer inactive (and the inactivity check is canceled or being canceled)
 1453    // - the connection is still inactive and we request shutdown
 1454    private void ScheduleInactivityCheck() =>
 8261455        _inactivityTimeoutTimer.Change(_inactivityTimeout, Timeout.InfiniteTimeSpan);
 1456
 1457    private ValueTask<FlushResult> SendControlFrameAsync(
 1458        IceRpcControlFrameType frameType,
 1459        EncodeAction encodeAction,
 1460        CancellationToken cancellationToken)
 4131461    {
 4131462        PipeWriter output = _controlStream!.Output;
 1463
 4131464        EncodeFrame(output);
 1465
 4131466        return output.FlushAsync(cancellationToken); // Flush
 1467
 1468        void EncodeFrame(IBufferWriter<byte> buffer)
 4131469        {
 4131470            var encoder = new SliceEncoder(buffer, SliceEncoding.Slice2);
 4131471            encoder.EncodeIceRpcControlFrameType(frameType);
 4131472            Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(_headerSizeLength);
 4131473            int startPos = encoder.EncodedByteCount; // does not include the size
 4131474            encodeAction.Invoke(ref encoder);
 4131475            int frameSize = encoder.EncodedByteCount - startPos;
 4131476            SliceEncoder.EncodeVarUInt62((uint)frameSize, sizePlaceholder);
 4131477        }
 4131478    }
 1479
 1480    /// <summary>Sends the payload continuation of an outgoing request in the background.</summary>
 1481    /// <remarks>We send the payload continuation on a separate thread with Task.Run: this ensures that the synchronous
 1482    /// activity that could result from reading or writing the payload continuation doesn't delay in any way the
 1483    /// caller. </remarks>
 1484    /// <param name="request">The outgoing request.</param>
 1485    /// <param name="payloadWriter">The payload writer.</param>
 1486    /// <param name="writesClosed">A task that completes when we can no longer write to payloadWriter.</param>
 1487    /// <param name="onGoAway">An action to execute with a CTS when we receive the GoAway frame from the peer.</param>
 1488    /// <param name="cancellationToken">The cancellation token of the invocation; the associated CTS is disposed when
 1489    /// the invocation completes.</param>
 1490    private void SendRequestPayloadContinuation(
 1491        OutgoingRequest request,
 1492        PipeWriter payloadWriter,
 1493        Task writesClosed,
 1494        Action<object?> onGoAway,
 1495        CancellationToken cancellationToken)
 121496    {
 121497        Debug.Assert(request.PayloadContinuation is not null);
 1498
 1499        // First "detach" the continuation.
 121500        PipeReader payloadContinuation = request.PayloadContinuation;
 121501        request.PayloadContinuation = null;
 1502
 1503        lock (_mutex)
 121504        {
 121505            Debug.Assert(_dispatchInvocationCount > 0); // as a result, can't be disposed.
 1506
 1507            // Give the task its own dispatch-invocation count. This ensures the transport connection won't be disposed
 1508            // while the continuation is being sent.
 121509            IncrementDispatchInvocationCount();
 121510        }
 1511
 1512        // This background task owns payloadContinuation, payloadWriter and 1 dispatch-invocation count, and must clean
 1513        // them up. Hence CancellationToken.None.
 121514        _ = Task.Run(PerformSendRequestPayloadContinuationAsync, CancellationToken.None);
 1515
 1516        async Task PerformSendRequestPayloadContinuationAsync()
 121517        {
 121518            bool success = false;
 1519
 1520            try
 121521            {
 1522                // Since _dispatchInvocationCount > 0, _disposedCts is not disposed.
 121523                using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _disposedCts.Token);
 1524
 1525                // This token registration is needed for one-way requests and is redundant for two-way requests.
 1526                // We want GoAway to cancel the sending of one-way requests that have not been received by the peer,
 1527                // especially when these requests have payload continuations.
 121528                using CancellationTokenRegistration tokenRegistration = _goAwayCts.Token.UnsafeRegister(onGoAway, cts);
 1529
 1530                try
 121531                {
 1532                    // The cancellation of the InvokeAsync's cancellationToken cancels cts only until InvokeAsync's
 1533                    // PerformInvokeAsync completes. Afterwards, the cancellation of InvokeAsync's cancellationToken has
 1534                    // no effect on cts, so it doesn't cancel the copying of payloadContinuation.
 121535                    FlushResult flushResult = await payloadWriter.CopyFromAsync(
 121536                        payloadContinuation,
 121537                        writesClosed,
 121538                        endStream: true,
 121539                        cts.Token).ConfigureAwait(false);
 1540
 51541                    success = !flushResult.IsCanceled;
 51542                }
 31543                catch (OperationCanceledException exception) when (exception.CancellationToken == cts.Token)
 21544                {
 1545                    // Process/translate this exception primarily for the benefit of _taskExceptionObserver.
 1546
 1547                    // Can be because cancellationToken was canceled by DisposeAsync or GoAway; that's fine.
 21548                    cancellationToken.ThrowIfCancellationRequested();
 1549
 11550                    if (_disposedCts.IsCancellationRequested)
 01551                    {
 1552                        // DisposeAsync aborted the request.
 01553                        throw new IceRpcException(IceRpcError.OperationAborted);
 1554                    }
 1555                    else
 11556                    {
 1557                        // When _goAwayCts is canceled and onGoAway cancels its argument:
 1558                        // - if PerformInvokeAsync is no longer running (typical for a one-way request), we get here
 1559                        // - if PerformInvokeAsync is still running, we may get here or cancellationToken gets canceled
 1560                        // first.
 11561                        Debug.Assert(_goAwayCts.IsCancellationRequested);
 11562                        throw new IceRpcException(IceRpcError.InvocationCanceled, "The connection is shutting down.");
 1563                    }
 1564                }
 51565            }
 71566            catch (Exception exception) when (_taskExceptionObserver is not null)
 51567            {
 51568                _taskExceptionObserver.RequestPayloadContinuationFailed(
 51569                    request,
 51570                    _connectionContext!.TransportConnectionInformation,
 51571                    exception);
 51572            }
 11573            catch (OperationCanceledException exception) when (exception.CancellationToken == cancellationToken)
 11574            {
 1575                // Expected.
 11576            }
 11577            catch (IceRpcException)
 11578            {
 1579                // Expected, with for example IceRpcError.ConnectionAborted when the peer aborts the connection.
 11580            }
 01581            catch (Exception exception)
 01582            {
 1583                // This exception is unexpected when running the IceRPC test suite. A test that expects such an
 1584                // exception must install a task exception observer.
 01585                Debug.Fail($"Failed to send payload continuation of request {request}: {exception}");
 1586
 1587                // If Debug is not enabled and there is no task exception observer, we rethrow to generate an
 1588                // Unobserved Task Exception.
 01589                throw;
 1590            }
 1591            finally
 121592            {
 121593                payloadWriter.CompleteOutput(success);
 121594                payloadContinuation.Complete();
 121595                DecrementDispatchInvocationCount();
 121596            }
 121597        }
 121598    }
 1599}

Methods/Properties

get_IsServer()
.ctor(IceRpc.Transports.IMultiplexedConnection,IceRpc.Transports.TransportConnectionInformation,IceRpc.ConnectionOptions,IceRpc.Internal.ITaskExceptionObserver)
ConnectAsync(System.Threading.CancellationToken)
PerformConnectAsync()
DisposeAsync()
PerformDisposeAsync()
InvokeAsync(IceRpc.OutgoingRequest,System.Threading.CancellationToken)
PerformInvokeAsync()
OnGoAway()
DecodeHeader()
EncodeHeader()
ShutdownAsync(System.Threading.CancellationToken)
PerformShutdownAsync()
DecodeFieldDictionary(ZeroC.Slice.SliceDecoder&,ZeroC.Slice.DecodeFunc`1<TKey>)
AcceptRequestsAsync()
CheckPeerHeaderSize(System.Int32)
DecrementDispatchInvocationCount()
DecrementStreamInputOutputCount()
DispatchRequestAsync()
PerformDispatchRequestAsync()
DecodeHeader()
EncodeHeader()
EncodeFieldDictionary(System.Collections.Generic.IDictionary`2<TKey,IceRpc.OutgoingFieldValue>,ZeroC.Slice.EncodeAction`1<TKey>,ZeroC.Slice.SliceEncoder&,System.IO.Pipelines.PipeWriter)
IncrementDispatchInvocationCount()
IncrementStreamInputOutputCount(System.Boolean)
ReadGoAwayAsync()
ReceiveControlFrameHeaderAsync()
ReceiveSettingsFrameBody()
RefuseNewInvocations(System.String)
ScheduleInactivityCheck()
SendControlFrameAsync(IceRpc.Internal.IceRpcControlFrameType,ZeroC.Slice.EncodeAction,System.Threading.CancellationToken)
EncodeFrame()
SendRequestPayloadContinuation(IceRpc.OutgoingRequest,System.IO.Pipelines.PipeWriter,System.Threading.Tasks.Task,System.Action`1<System.Object>,System.Threading.CancellationToken)
PerformSendRequestPayloadContinuationAsync()