< Summary

Information
Class: IceRpc.Internal.IceRpcProtocolConnection
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Internal/IceRpcProtocolConnection.cs
Tag: 1321_24790053727
Line coverage
91%
Covered lines: 852
Uncovered lines: 79
Coverable lines: 931
Total lines: 1593
Line coverage: 91.5%
Branch coverage
89%
Covered branches: 200
Total branches: 224
Branch coverage: 89.2%
Method coverage
100%
Covered methods: 34
Fully covered methods: 21
Total methods: 34
Method coverage: 100%
Full method coverage: 61.7%

Metrics

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Internal/IceRpcProtocolConnection.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Transports;
 4using System.Buffers;
 5using System.Collections.Immutable;
 6using System.Diagnostics;
 7using System.IO.Pipelines;
 8using System.Security.Authentication;
 9using ZeroC.Slice.Codec;
 10
 11namespace IceRpc.Internal;
 12
 13internal sealed class IceRpcProtocolConnection : IProtocolConnection
 14{
 15    private const int MaxGoAwayFrameBodySize = 16;
 16    private const int MaxSettingsFrameBodySize = 1024;
 17
 149918    private bool IsServer => _transportConnectionInformation is not null;
 19
 20    private Task? _acceptRequestsTask;
 21
 22    private Task? _connectTask;
 23    private IConnectionContext? _connectionContext; // non-null once the connection is established
 24    private IMultiplexedStream? _controlStream;
 25
 26    // The number of outstanding dispatches and invocations.
 27    // DisposeAsync waits until this count reaches 0 (using _dispatchesAndInvocationsCompleted) before disposing the
 28    // underlying transport connection. So when this count is greater than 0, we know _transportConnection and other
 29    // fields are not disposed.
 30    // _dispatchInvocationCount is also used for the inactivity check: the connection remains active while
 31    // _dispatchInvocationCount > 0 or _streamInputOutputCount > 0.
 32    private int _dispatchInvocationCount;
 33
 34    private readonly SemaphoreSlim? _dispatchSemaphore;
 35
 36    private readonly IDispatcher? _dispatcher;
 38137    private readonly TaskCompletionSource _dispatchesAndInvocationsCompleted =
 38138        new(TaskCreationOptions.RunContinuationsAsynchronously);
 39
 40    private Task? _disposeTask;
 41
 42    // This cancellation token source is canceled when the connection is disposed.
 38143    private readonly CancellationTokenSource _disposedCts = new();
 44
 45    // Canceled when we receive the GoAway frame from the peer.
 38146    private readonly CancellationTokenSource _goAwayCts = new();
 47
 48    // The GoAway frame received from the peer. Read it only after _goAwayCts is canceled.
 49    private IceRpcGoAway _goAwayFrame;
 50
 51    // The number of bytes we need to encode a size up to _maxPeerHeaderSize. It's 2 for DefaultMaxIceRpcHeaderSize.
 38152    private int _headerSizeLength = 2;
 53
 54    private readonly TimeSpan _inactivityTimeout;
 55    private readonly Timer _inactivityTimeoutTimer;
 56    private string? _invocationRefusedMessage;
 57
 58    // The ID of the last bidirectional stream accepted by this connection. It's null as long as no bidirectional stream
 59    // was accepted.
 60    private ulong? _lastRemoteBidirectionalStreamId;
 61
 62    // The ID of the last unidirectional stream accepted by this connection. It's null as long as no unidirectional
 63    // stream (other than _remoteControlStream) was accepted.
 64    private ulong? _lastRemoteUnidirectionalStreamId;
 65
 66    private readonly int _maxLocalHeaderSize;
 38167    private int _maxPeerHeaderSize = ConnectionOptions.DefaultMaxIceRpcHeaderSize;
 68
 38169    private readonly Lock _mutex = new();
 70
 71    private Task? _readGoAwayTask;
 72
 73    // A connection refuses invocations when it's disposed, shut down, shutting down or merely "shutdown requested".
 74    private bool _refuseInvocations;
 75
 76    private IMultiplexedStream? _remoteControlStream;
 77
 78    private readonly CancellationTokenSource _shutdownOrGoAwayCts;
 79
 80    // The thread that completes this TCS can run the continuations, and as a result its result must be set without
 81    // holding a lock on _mutex.
 38182    private readonly TaskCompletionSource _shutdownRequestedTcs = new();
 83
 84    private Task? _shutdownTask;
 85
 86    // Keeps track of the number of stream Input and Output that are not completed yet.
 87    // It's not the same as the _dispatchInvocationCount: a dispatch or invocation can be completed while the
 88    // application is still reading an incoming frame payload that corresponds to a stream input.
 89    // ShutdownAsync waits for both _streamInputOutputCount and _dispatchInvocationCount to reach 0, while DisposeAsync
 90    // only waits for _dispatchInvocationCount to reach 0.
 91    private int _streamInputOutputCount;
 92
 93    // The streams are completed when _shutdownTask is not null and _streamInputOutputCount is 0.
 38194    private readonly TaskCompletionSource _streamsCompleted = new(TaskCreationOptions.RunContinuationsAsynchronously);
 95
 96    private readonly ITaskExceptionObserver? _taskExceptionObserver;
 97
 98    private readonly IMultiplexedConnection _transportConnection;
 99
 100    // Only set for server connections.
 101    private readonly TransportConnectionInformation? _transportConnectionInformation;
 102
 103    public Task<(TransportConnectionInformation ConnectionInformation, Task ShutdownRequested)> ConnectAsync(
 104        CancellationToken cancellationToken)
 374105    {
 106        Task<(TransportConnectionInformation ConnectionInformation, Task ShutdownRequested)> result;
 107
 108        lock (_mutex)
 374109        {
 374110            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 111
 372112            if (_connectTask is not null)
 0113            {
 0114                throw new InvalidOperationException("Cannot call connect more than once.");
 115            }
 116
 372117            result = PerformConnectAsync();
 372118            _connectTask = result;
 372119        }
 372120        return result;
 121
 122        async Task<(TransportConnectionInformation ConnectionInformation, Task ShutdownRequested)> PerformConnectAsync()
 372123        {
 124            // Make sure we execute the function without holding the connection mutex lock.
 372125            await Task.Yield();
 126
 127            // _disposedCts is not disposed at this point because DisposeAsync waits for the completion of _connectTask.
 372128            using var connectCts = CancellationTokenSource.CreateLinkedTokenSource(
 372129                cancellationToken,
 372130                _disposedCts.Token);
 131
 132            TransportConnectionInformation transportConnectionInformation;
 133
 134            try
 372135            {
 136                // If the transport connection information is null, we need to connect the transport connection. It's
 137                // null for client connections. The transport connection of a server connection is established by
 138                // Server.
 372139                transportConnectionInformation = _transportConnectionInformation ??
 372140                    await _transportConnection.ConnectAsync(connectCts.Token).ConfigureAwait(false);
 141
 347142                _controlStream = await _transportConnection.CreateStreamAsync(
 347143                    false,
 347144                    connectCts.Token).ConfigureAwait(false);
 145
 343146                var settings = new IceRpcSettings(
 343147                    _maxLocalHeaderSize == ConnectionOptions.DefaultMaxIceRpcHeaderSize ?
 343148                        ImmutableDictionary<IceRpcSettingKey, ulong>.Empty :
 343149                        new Dictionary<IceRpcSettingKey, ulong>
 343150                        {
 343151                            [IceRpcSettingKey.MaxHeaderSize] = (ulong)_maxLocalHeaderSize
 343152                        });
 153
 154                try
 343155                {
 343156                    await SendControlFrameAsync(
 343157                        IceRpcControlFrameType.Settings,
 343158                        settings.Encode,
 343159                        connectCts.Token).ConfigureAwait(false);
 339160                }
 4161                catch
 4162                {
 163                    // If we fail to send the Settings frame, we are in an abortive closure and we close Output to allow
 164                    // the peer to continue if it's waiting for us. This could happen when the cancellation token is
 165                    // canceled.
 4166                    _controlStream!.Output.CompleteOutput(success: false);
 4167                    throw;
 168                }
 169
 170                // Wait for the remote control stream to be accepted and read the protocol Settings frame
 339171                _remoteControlStream = await _transportConnection.AcceptStreamAsync(
 339172                    connectCts.Token).ConfigureAwait(false);
 173
 322174                await ReceiveControlFrameHeaderAsync(
 322175                    IceRpcControlFrameType.Settings,
 322176                    connectCts.Token).ConfigureAwait(false);
 177
 315178                await ReceiveSettingsFrameBody(connectCts.Token).ConfigureAwait(false);
 310179            }
 29180            catch (OperationCanceledException)
 29181            {
 29182                cancellationToken.ThrowIfCancellationRequested();
 183
 11184                Debug.Assert(_disposedCts.Token.IsCancellationRequested);
 11185                throw new IceRpcException(
 11186                    IceRpcError.OperationAborted,
 11187                    "The connection establishment was aborted because the connection was disposed.");
 188            }
 7189            catch (InvalidDataException exception)
 7190            {
 7191                throw new IceRpcException(
 7192                    IceRpcError.ConnectionAborted,
 7193                    "The connection establishment was aborted by an icerpc protocol error.",
 7194                    exception);
 195            }
 1196            catch (AuthenticationException)
 1197            {
 1198                throw;
 199            }
 25200            catch (IceRpcException)
 25201            {
 25202                throw;
 203            }
 0204            catch (Exception exception)
 0205            {
 0206                Debug.Fail($"ConnectAsync failed with an unexpected exception: {exception}");
 0207                throw;
 208            }
 209
 210            // This needs to be set before starting the accept requests task below.
 310211            _connectionContext = new ConnectionContext(this, transportConnectionInformation);
 212
 213            // We assign _readGoAwayTask and _acceptRequestsTask with _mutex locked to make sure this assignment
 214            // occurs before the start of DisposeAsync. Once _disposeTask is not null, _readGoAwayTask etc are
 215            // immutable.
 216            lock (_mutex)
 310217            {
 310218                if (_disposeTask is not null)
 0219                {
 0220                    throw new IceRpcException(
 0221                        IceRpcError.OperationAborted,
 0222                        "The connection establishment was aborted because the connection was disposed.");
 223                }
 224
 225                // Read the go away frame from the control stream.
 310226                _readGoAwayTask = ReadGoAwayAsync(_disposedCts.Token);
 227
 228                // Start a task that accepts requests (the "accept requests loop")
 310229                _acceptRequestsTask = AcceptRequestsAsync(_shutdownOrGoAwayCts.Token);
 310230            }
 231
 232            // The _acceptRequestsTask waits for this PerformConnectAsync completion before reading anything. As soon as
 233            // it receives a request, it will cancel this inactivity check.
 310234            ScheduleInactivityCheck();
 235
 310236            return (transportConnectionInformation, _shutdownRequestedTcs.Task);
 310237        }
 372238    }
 239
 240    public ValueTask DisposeAsync()
 407241    {
 242        lock (_mutex)
 407243        {
 407244            if (_disposeTask is null)
 381245            {
 381246                RefuseNewInvocations("The connection was disposed.");
 247
 381248                if (_streamInputOutputCount == 0)
 368249                {
 250                    // That's only for consistency. _streamsCompleted.Task matters only to ShutdownAsync.
 368251                    _streamsCompleted.TrySetResult();
 368252                }
 381253                if (_dispatchInvocationCount == 0)
 372254                {
 372255                    _dispatchesAndInvocationsCompleted.TrySetResult();
 372256                }
 257
 381258                _shutdownTask ??= Task.CompletedTask;
 381259                _disposeTask = PerformDisposeAsync();
 381260            }
 407261        }
 407262        return new(_disposeTask);
 263
 264        async Task PerformDisposeAsync()
 381265        {
 266            // Make sure we execute the code below without holding the mutex lock.
 381267            await Task.Yield();
 268
 381269            _disposedCts.Cancel();
 270
 271            // We don't lock _mutex since once _disposeTask is not null, _connectTask etc are immutable.
 272
 381273            if (_connectTask is not null)
 372274            {
 275                // We wait for _dispatchesAndInvocationsCompleted (since dispatches and invocations are somewhat under
 276                // our control), but not for _streamsCompleted, since we can't make the application complete the
 277                // incoming payload pipe readers.
 278                try
 372279                {
 372280                    await Task.WhenAll(
 372281                        _connectTask,
 372282                        _acceptRequestsTask ?? Task.CompletedTask,
 372283                        _readGoAwayTask ?? Task.CompletedTask,
 372284                        _shutdownTask,
 372285                        _dispatchesAndInvocationsCompleted.Task).ConfigureAwait(false);
 82286                }
 290287                catch
 290288                {
 289                    // Expected if any of these tasks failed or was canceled. Each task takes care of handling
 290                    // unexpected exceptions so there's no need to handle them here.
 290291                }
 372292            }
 293
 294            // If the application is still reading some incoming payload, the disposal of the transport connection can
 295            // abort this reading.
 381296            await _transportConnection.DisposeAsync().ConfigureAwait(false);
 297
 298            // It's safe to complete the output since write operations have been completed by the transport connection
 299            // disposal.
 381300            _controlStream?.Output.Complete();
 301
 302            // It's safe to complete the input since read operations have been completed by the transport connection
 303            // disposal.
 381304            _remoteControlStream?.Input.Complete();
 305
 381306            _dispatchSemaphore?.Dispose();
 381307            _disposedCts.Dispose();
 381308            _goAwayCts.Dispose();
 381309            _shutdownOrGoAwayCts.Dispose();
 310
 381311            await _inactivityTimeoutTimer.DisposeAsync().ConfigureAwait(false);
 381312        }
 407313    }
 314
 315    public Task<IncomingResponse> InvokeAsync(OutgoingRequest request, CancellationToken cancellationToken = default)
 1438316    {
 1438317        if (request.Protocol != Protocol.IceRpc)
 1318        {
 1319            throw new InvalidOperationException(
 1320                $"Cannot send {request.Protocol} request on {Protocol.IceRpc} connection.");
 321        }
 322
 323        lock (_mutex)
 1437324        {
 1437325            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 326
 1436327            if (_refuseInvocations)
 1328            {
 1329                throw new IceRpcException(IceRpcError.InvocationRefused, _invocationRefusedMessage);
 330            }
 1435331            if (_connectTask is null)
 0332            {
 0333                throw new InvalidOperationException("Cannot invoke on a connection before connecting it.");
 334            }
 1435335            if (!IsServer && !_connectTask.IsCompletedSuccessfully)
 0336            {
 0337                throw new InvalidOperationException(
 0338                    "Cannot invoke on a client connection that is not fully established.");
 339            }
 340            // It's possible but rare to invoke on a server connection that is still connecting.
 341
 1435342            if (request.ServiceAddress.Fragment.Length > 0)
 0343            {
 0344                throw new NotSupportedException("The icerpc protocol does not support fragments.");
 345            }
 346
 1435347            IncrementDispatchInvocationCount();
 1435348        }
 349
 1435350        return PerformInvokeAsync();
 351
 352        async Task<IncomingResponse> PerformInvokeAsync()
 1435353        {
 354            // Since _dispatchInvocationCount > 0, _disposedCts is not disposed.
 1435355            using var invocationCts = CancellationTokenSource.CreateLinkedTokenSource(
 1435356                cancellationToken,
 1435357                _disposedCts.Token);
 358
 1435359            PipeReader? streamInput = null;
 360
 361            // This try/catch block cleans up streamInput (when not null) and decrements the dispatch-invocation count.
 362            try
 1435363            {
 364                // Create the stream.
 365                IMultiplexedStream stream;
 366                try
 1435367                {
 368                    // We want to cancel CreateStreamAsync as soon as the connection is being shutdown or received a
 369                    // GoAway frame.
 1435370                    using CancellationTokenRegistration _ = _shutdownOrGoAwayCts.Token.UnsafeRegister(
 5371                        cts => ((CancellationTokenSource)cts!).Cancel(),
 1435372                        invocationCts);
 373
 1435374                    stream = await _transportConnection.CreateStreamAsync(
 1435375                        bidirectional: !request.IsOneway,
 1435376                        invocationCts.Token).ConfigureAwait(false);
 377
 1426378                    streamInput = stream.IsBidirectional ? stream.Input : null;
 1426379                }
 8380                catch (OperationCanceledException)
 8381                {
 8382                    cancellationToken.ThrowIfCancellationRequested();
 383
 384                    // Connection was shutdown or disposed and we did not read the payload at all.
 6385                    throw new IceRpcException(IceRpcError.InvocationRefused, _invocationRefusedMessage);
 386                }
 1387                catch (IceRpcException exception)
 1388                {
 1389                    RefuseNewInvocations("The connection was lost.");
 1390                    throw new IceRpcException(IceRpcError.InvocationRefused, _invocationRefusedMessage, exception);
 391                }
 0392                catch (Exception exception)
 0393                {
 0394                    Debug.Fail($"CreateStreamAsync failed with an unexpected exception: {exception}");
 0395                    RefuseNewInvocations("The connection was lost.");
 0396                    throw new IceRpcException(IceRpcError.InvocationRefused, _invocationRefusedMessage, exception);
 397                }
 398
 1426399                using CancellationTokenRegistration tokenRegistration = _goAwayCts.Token.UnsafeRegister(
 1426400                    OnGoAway,
 1426401                    invocationCts);
 402
 403                PipeWriter payloadWriter;
 404
 405                try
 1426406                {
 407                    lock (_mutex)
 1426408                    {
 1426409                        if (_refuseInvocations)
 0410                        {
 411                            // Both stream.Output and stream.Output are completed by catch blocks below.
 0412                            throw new IceRpcException(IceRpcError.InvocationRefused, _invocationRefusedMessage);
 413                        }
 414
 1426415                        IncrementStreamInputOutputCount(stream.IsBidirectional);
 416
 417                        // Decorate the stream to decrement the input/output count on Complete.
 1426418                        stream = new MultiplexedStreamDecorator(stream, DecrementStreamInputOutputCount);
 1426419                        streamInput = stream.IsBidirectional ? stream.Input : null;
 1426420                    }
 421
 1426422                    EncodeHeader(stream.Output);
 1425423                    payloadWriter = request.GetPayloadWriter(stream.Output);
 1425424                }
 1425                catch
 1426                {
 1427                    stream.Output.CompleteOutput(success: false);
 1428                    throw;
 429                }
 430
 431                // From now on, we only use payloadWriter to write and we make sure to complete it.
 432
 1425433                bool hasContinuation = request.PayloadContinuation is not null;
 434                FlushResult flushResult;
 435
 436                try
 1425437                {
 1425438                    flushResult = await payloadWriter.CopyFromAsync(
 1425439                        request.Payload,
 1425440                        stream.WritesClosed,
 1425441                        endStream: !hasContinuation,
 1425442                        invocationCts.Token).ConfigureAwait(false);
 1417443                }
 8444                catch
 8445                {
 8446                    payloadWriter.CompleteOutput(success: false);
 8447                    request.PayloadContinuation?.Complete();
 8448                    throw;
 449                }
 450                finally
 1425451                {
 1425452                    request.Payload.Complete();
 1425453                }
 454
 1417455                if (flushResult.IsCompleted || flushResult.IsCanceled || !hasContinuation)
 1405456                {
 457                    // The remote reader doesn't want more data, or the copying was canceled, or there is no
 458                    // continuation: we're done.
 1405459                    payloadWriter.CompleteOutput(!flushResult.IsCanceled);
 1405460                    request.PayloadContinuation?.Complete();
 1405461                }
 462                else
 12463                {
 464                    // Sends the payload continuation in a background thread.
 12465                    SendRequestPayloadContinuation(
 12466                        request,
 12467                        payloadWriter,
 12468                        stream.WritesClosed,
 12469                        OnGoAway,
 12470                        invocationCts.Token);
 12471                }
 472
 1417473                if (request.IsOneway)
 1008474                {
 1008475                    return new IncomingResponse(request, _connectionContext!);
 476                }
 477
 409478                Debug.Assert(streamInput is not null);
 479
 480                try
 409481                {
 409482                    ReadResult readResult = await streamInput.ReadSliceSegmentAsync(
 409483                        _maxLocalHeaderSize,
 409484                        invocationCts.Token).ConfigureAwait(false);
 485
 486                    // Nothing cancels the stream input pipe reader.
 387487                    Debug.Assert(!readResult.IsCanceled);
 488
 387489                    if (readResult.Buffer.IsEmpty)
 0490                    {
 0491                        throw new IceRpcException(
 0492                            IceRpcError.IceRpcError,
 0493                            "Received an icerpc response with an empty header.");
 494                    }
 495
 387496                    (StatusCode statusCode, string? errorMessage, IDictionary<ResponseFieldKey, ReadOnlySequence<byte>> 
 387497                        DecodeHeader(readResult.Buffer);
 386498                    stream.Input.AdvanceTo(readResult.Buffer.End);
 499
 386500                    if (statusCode == StatusCode.TruncatedPayload && invocationCts.Token.IsCancellationRequested)
 0501                    {
 502                        // Canceling the sending of the payload continuation triggers the completion of the stream
 503                        // output. This may lead to a TruncatedPayload if the dispatch is currently reading the payload
 504                        // continuation. In such cases, we prioritize throwing an OperationCanceledException.
 0505                        fieldsPipeReader?.Complete();
 0506                        invocationCts.Token.ThrowIfCancellationRequested();
 0507                    }
 508
 386509                    var response = new IncomingResponse(
 386510                        request,
 386511                        _connectionContext!,
 386512                        statusCode,
 386513                        errorMessage,
 386514                        fields,
 386515                        fieldsPipeReader)
 386516                    {
 386517                        Payload = streamInput
 386518                    };
 519
 386520                    streamInput = null; // response now owns the stream input
 386521                    return response;
 522                }
 2523                catch (InvalidDataException exception)
 2524                {
 2525                    throw new IceRpcException(
 2526                        IceRpcError.IceRpcError,
 2527                        "Received an icerpc response with an invalid header.",
 2528                        exception);
 529                }
 530
 531                void OnGoAway(object? cts)
 13532                {
 13533                    if (!stream.IsStarted ||
 13534                        stream.Id >=
 13535                            (stream.IsBidirectional ?
 13536                                _goAwayFrame.BidirectionalStreamId :
 13537                                _goAwayFrame.UnidirectionalStreamId))
 4538                    {
 539                        // The request wasn't received by the peer so it's safe to cancel the invocation.
 4540                        ((CancellationTokenSource)cts!).Cancel();
 4541                    }
 13542                }
 543            }
 17544            catch (OperationCanceledException exception) when (exception.CancellationToken == invocationCts.Token)
 14545            {
 14546                cancellationToken.ThrowIfCancellationRequested();
 547
 7548                if (_disposedCts.IsCancellationRequested)
 4549                {
 550                    // DisposeAsync aborted the request.
 4551                    throw new IceRpcException(IceRpcError.OperationAborted);
 552                }
 553                else
 3554                {
 3555                    Debug.Assert(_goAwayCts.IsCancellationRequested);
 3556                    throw new IceRpcException(IceRpcError.InvocationCanceled, "The connection is shutting down.");
 557                }
 558            }
 559            finally
 1435560            {
 1435561                streamInput?.Complete();
 1435562                DecrementDispatchInvocationCount();
 1435563            }
 564
 565            static (StatusCode StatusCode, string? ErrorMessage, IDictionary<ResponseFieldKey, ReadOnlySequence<byte>>, 
 566                ReadOnlySequence<byte> buffer)
 387567            {
 387568                var decoder = new SliceDecoder(buffer);
 569
 387570                StatusCode statusCode = decoder.DecodeStatusCode();
 387571                string? errorMessage = statusCode == StatusCode.Ok ? null : decoder.DecodeString();
 572
 387573                (IDictionary<ResponseFieldKey, ReadOnlySequence<byte>> fields, PipeReader? pipeReader) =
 387574                    DecodeFieldDictionary(
 387575                        ref decoder,
 390576                        (ref SliceDecoder decoder) => decoder.DecodeResponseFieldKey());
 577
 386578                return (statusCode, errorMessage, fields, pipeReader);
 386579            }
 580
 581            void EncodeHeader(PipeWriter streamOutput)
 1426582            {
 1426583                var encoder = new SliceEncoder(streamOutput);
 584
 585                // Write the IceRpc request header.
 1426586                Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(_headerSizeLength);
 587
 588                // We use UnflushedBytes because EncodeFieldDictionary can write directly to streamOutput.
 1426589                long headerStartPos = streamOutput.UnflushedBytes;
 590
 1426591                var header = new IceRpcRequestHeader(request.ServiceAddress.Path, request.Operation);
 592
 1426593                header.Encode(ref encoder);
 594
 1426595                EncodeFieldDictionary(
 1426596                    request.Fields,
 14597                    (ref SliceEncoder encoder, RequestFieldKey key) => encoder.EncodeRequestFieldKey(key),
 1426598                    ref encoder,
 1426599                    streamOutput);
 600
 601                // We're done with the header encoding, write the header size.
 1426602                int headerSize = (int)(streamOutput.UnflushedBytes - headerStartPos);
 1426603                CheckPeerHeaderSize(headerSize);
 1425604                SliceEncoder.EncodeVarUInt62((uint)headerSize, sizePlaceholder);
 1425605            }
 1394606        }
 1435607    }
 608
 609    public Task ShutdownAsync(CancellationToken cancellationToken = default)
 111610    {
 611        lock (_mutex)
 111612        {
 111613            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 614
 108615            if (_shutdownTask is not null)
 2616            {
 2617                throw new InvalidOperationException("Cannot call ShutdownAsync more than once.");
 618            }
 106619            if (_connectTask is null || !_connectTask.IsCompletedSuccessfully)
 3620            {
 3621                throw new InvalidOperationException("Cannot shut down a protocol connection before it's connected.");
 622            }
 623
 103624            RefuseNewInvocations("The connection was shut down.");
 625
 103626            if (_streamInputOutputCount == 0)
 82627            {
 82628                _streamsCompleted.TrySetResult();
 82629            }
 103630            if (_dispatchInvocationCount == 0)
 80631            {
 80632                _dispatchesAndInvocationsCompleted.TrySetResult();
 80633            }
 634
 103635            _shutdownTask = PerformShutdownAsync();
 103636        }
 103637        return _shutdownTask;
 638
 639        async Task PerformShutdownAsync()
 103640        {
 103641            await Task.Yield(); // exit mutex lock
 642
 103643            _shutdownOrGoAwayCts.Cancel();
 644
 645            try
 103646            {
 103647                Debug.Assert(_acceptRequestsTask is not null);
 103648                Debug.Assert(_controlStream is not null);
 103649                Debug.Assert(_readGoAwayTask is not null);
 103650                Debug.Assert(_remoteControlStream is not null);
 651
 103652                await _acceptRequestsTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 653
 90654                using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _disposedCts.Token);
 655
 656                // Once shutdownTask is not null, _lastRemoteBidirectionalStreamId and _lastRemoteUnidirectionalStreamId
 657                // are immutable.
 658
 659                // When this peer is the server endpoint, the first accepted bidirectional stream ID is 0. When this
 660                // peer is the client endpoint, the first accepted bidirectional stream ID is 1.
 90661                IceRpcGoAway goAwayFrame = new(
 90662                    _lastRemoteBidirectionalStreamId is ulong value ? value + 4 : (IsServer ? 0ul : 1ul),
 90663                    (_lastRemoteUnidirectionalStreamId ?? _remoteControlStream.Id) + 4);
 664
 665                try
 90666                {
 90667                    _ = await SendControlFrameAsync(
 90668                        IceRpcControlFrameType.GoAway,
 90669                        goAwayFrame.Encode,
 90670                        cts.Token).ConfigureAwait(false);
 671
 672                    // Wait for the peer to send back a GoAway frame. The task should already be completed if the
 673                    // shutdown was initiated by the peer.
 89674                    await _readGoAwayTask.WaitAsync(cts.Token).ConfigureAwait(false);
 675
 676                    // Wait for all streams (other than the control streams) to have their Input and Output completed.
 79677                    await _streamsCompleted.Task.WaitAsync(cts.Token).ConfigureAwait(false);
 678
 679                    // Close the control stream to notify the peer that on our side, all the streams completed and that
 680                    // it can close the transport connection whenever it likes.
 78681                    _controlStream.Output.CompleteOutput(success: true);
 78682                }
 12683                catch
 12684                {
 685                    // If we fail to send the GoAway frame or some other failure occur (such as
 686                    // OperationCanceledException) we are in an abortive closure and we close Output to allow
 687                    // the peer to continue if it's waiting for us.
 12688                    _controlStream.Output.CompleteOutput(success: false);
 12689                    throw;
 690                }
 691
 692                // Wait for the peer notification that on its side all the streams are completed. It's important to wait
 693                // for this notification before closing the connection. In particular with QUIC where closing the
 694                // connection before all the streams are processed could lead to a stream failure.
 695                try
 78696                {
 697                    // Wait for the _remoteControlStream Input completion.
 78698                    ReadResult readResult = await _remoteControlStream.Input.ReadAsync(cts.Token).ConfigureAwait(false);
 699
 75700                    Debug.Assert(!readResult.IsCanceled);
 701
 75702                    if (!readResult.IsCompleted || !readResult.Buffer.IsEmpty)
 0703                    {
 0704                        throw new IceRpcException(
 0705                            IceRpcError.IceRpcError,
 0706                            "Received bytes on the control stream after receiving the GoAway frame.");
 707                    }
 708
 709                    // We can now safely close the connection.
 75710                    await _transportConnection.CloseAsync(MultiplexedConnectionCloseError.NoError, cts.Token)
 75711                        .ConfigureAwait(false);
 74712                }
 3713                catch (IceRpcException exception) when (exception.IceRpcError == IceRpcError.ConnectionClosedByPeer)
 1714                {
 715                    // Expected if the peer closed the connection first.
 1716                }
 717
 718                // We wait for the completion of the dispatches that we created (and, secondarily, invocations).
 75719                await _dispatchesAndInvocationsCompleted.Task.WaitAsync(cts.Token).ConfigureAwait(false);
 75720            }
 9721            catch (OperationCanceledException)
 9722            {
 9723                cancellationToken.ThrowIfCancellationRequested();
 724
 3725                Debug.Assert(_disposedCts.Token.IsCancellationRequested);
 3726                throw new IceRpcException(
 3727                    IceRpcError.OperationAborted,
 3728                    "The connection shutdown was aborted because the connection was disposed.");
 729            }
 0730            catch (InvalidDataException exception)
 0731            {
 0732                throw new IceRpcException(
 0733                    IceRpcError.IceRpcError,
 0734                    "The connection shutdown was aborted by an icerpc protocol error.",
 0735                    exception);
 736            }
 19737            catch (IceRpcException)
 19738            {
 19739                throw;
 740            }
 0741            catch (Exception exception)
 0742            {
 0743                Debug.Fail($"ShutdownAsync failed with an unexpected exception: {exception}");
 0744                throw;
 745            }
 75746        }
 103747    }
 748
 381749    internal IceRpcProtocolConnection(
 381750        IMultiplexedConnection transportConnection,
 381751        TransportConnectionInformation? transportConnectionInformation,
 381752        ConnectionOptions options,
 381753        ITaskExceptionObserver? taskExceptionObserver)
 381754    {
 381755        _shutdownOrGoAwayCts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token, _goAwayCts.Token);
 756
 381757        _taskExceptionObserver = taskExceptionObserver;
 758
 381759        _transportConnection = transportConnection;
 381760        _dispatcher = options.Dispatcher;
 381761        _maxLocalHeaderSize = options.MaxIceRpcHeaderSize;
 381762        _transportConnectionInformation = transportConnectionInformation;
 763
 381764        if (options.MaxDispatches > 0)
 381765        {
 381766            _dispatchSemaphore = new SemaphoreSlim(
 381767                initialCount: options.MaxDispatches,
 381768                maxCount: options.MaxDispatches);
 381769        }
 770
 381771        _inactivityTimeout = options.InactivityTimeout;
 381772        _inactivityTimeoutTimer = new Timer(_ =>
 5773        {
 5774            bool requestShutdown = false;
 381775
 381776            lock (_mutex)
 5777            {
 5778                if (_shutdownTask is null && _dispatchInvocationCount == 0 && _streamInputOutputCount == 0)
 5779                {
 5780                    requestShutdown = true;
 5781                    RefuseNewInvocations(
 5782                        $"The connection was shut down because it was inactive for over {_inactivityTimeout.TotalSeconds
 5783                }
 5784            }
 381785
 5786            if (requestShutdown)
 5787            {
 381788                // TrySetResult must be called outside the mutex lock
 5789                _shutdownRequestedTcs.TrySetResult();
 5790            }
 386791        });
 381792    }
 793
 794    private static (IDictionary<TKey, ReadOnlySequence<byte>>, PipeReader?) DecodeFieldDictionary<TKey>(
 795        ref SliceDecoder decoder,
 796        DecodeFunc<TKey> decodeKeyFunc) where TKey : struct
 1805797    {
 1805798        int count = decoder.DecodeSize();
 799
 800        IDictionary<TKey, ReadOnlySequence<byte>> fields;
 801        PipeReader? pipeReader;
 1805802        if (count == 0)
 1786803        {
 1786804            fields = ImmutableDictionary<TKey, ReadOnlySequence<byte>>.Empty;
 1786805            pipeReader = null;
 1786806            decoder.CheckEndOfBuffer();
 1786807        }
 808        else
 19809        {
 810            // We don't use the normal collection allocation check here because SizeOf<ReadOnlySequence<byte>> is quite
 811            // large (24).
 812            // For example, say we decode a fields dictionary with a single field with an empty value. It's encoded
 813            // using 1 byte (dictionary size) + 1 byte (key) + 1 byte (value size) = 3 bytes. The decoder's default max
 814            // allocation size is 3 * 8 = 24. If we simply call IncreaseCollectionAllocation(1, 4 + 24), we'll exceed
 815            // the default collection allocation limit. (sizeof TKey is currently 4 but could/should increase to 8).
 816
 817            // Each field consumes at least 2 bytes: 1 for the key and one for the value size.
 19818            if ((long)count * 2 > decoder.Remaining)
 2819            {
 2820                throw new InvalidDataException("Too many fields.");
 821            }
 822
 17823            fields = new Dictionary<TKey, ReadOnlySequence<byte>>(count);
 17824            var pipe = new Pipe();
 17825            decoder.CopyTo(pipe.Writer);
 17826            pipe.Writer.Complete();
 827
 828            try
 17829            {
 17830                _ = pipe.Reader.TryRead(out ReadResult readResult);
 17831                var fieldsDecoder = new SliceDecoder(readResult.Buffer);
 832
 68833                for (int i = 0; i < count; ++i)
 17834                {
 835                    // Decode the field key.
 17836                    TKey key = decodeKeyFunc(ref fieldsDecoder);
 837
 838                    // Decode and check the field value size.
 839                    int valueSize;
 840                    try
 17841                    {
 17842                        valueSize = checked((int)fieldsDecoder.DecodeVarUInt62());
 17843                    }
 0844                    catch (OverflowException exception)
 0845                    {
 0846                        throw new InvalidDataException("The field size can't be larger than int.MaxValue.", exception);
 847                    }
 848
 17849                    if (valueSize > fieldsDecoder.Remaining)
 0850                    {
 0851                        throw new InvalidDataException(
 0852                            $"The value of field '{key}' extends beyond the end of the buffer.");
 853                    }
 854
 855                    // Create a ROS reference to the field value by slicing the fields pipe reader ROS.
 17856                    ReadOnlySequence<byte> value = readResult.Buffer.Slice(fieldsDecoder.Consumed, valueSize);
 17857                    fields.Add(key, value);
 858
 859                    // Skip the field value to prepare the decoder to read the next field value.
 17860                    fieldsDecoder.Skip(valueSize);
 17861                }
 17862                fieldsDecoder.CheckEndOfBuffer();
 863
 17864                pipe.Reader.AdvanceTo(readResult.Buffer.Start); // complete read without consuming anything
 865
 17866                pipeReader = pipe.Reader;
 17867            }
 0868            catch
 0869            {
 0870                pipe.Reader.Complete();
 0871                throw;
 872            }
 17873        }
 874
 875        // The caller is responsible for completing the pipe reader.
 1803876        return (fields, pipeReader);
 1803877    }
 878
 879    private async Task AcceptRequestsAsync(CancellationToken cancellationToken)
 310880    {
 310881        await Task.Yield(); // exit mutex lock
 882
 883        // Wait for _connectTask (which spawned the task running this method) to complete. This way, we won't dispatch
 884        // any request until _connectTask has completed successfully, and indirectly we won't make any invocation until
 885        // _connectTask has completed successfully. The creation of the _acceptRequestsTask is the last action taken by
 886        // _connectTask and as a result this await can't fail.
 310887        await _connectTask!.ConfigureAwait(false);
 888
 889        try
 310890        {
 891            // We check the cancellation token for each iteration because we want to exit the accept requests loop as
 892            // soon as ShutdownAsync/GoAway requests this cancellation, even when more streams can be accepted without
 893            // waiting.
 1733894            while (!cancellationToken.IsCancellationRequested)
 1733895            {
 896                // When _dispatcher is null, the multiplexed connection MaxUnidirectionalStreams and
 897                // MaxBidirectionalStreams options are configured to not accept any request-stream from the peer. As a
 898                // result, when _dispatcher is null, this call will block indefinitely until the cancellation token is
 899                // canceled by ShutdownAsync, GoAway or DisposeAsync.
 1733900                IMultiplexedStream stream = await _transportConnection.AcceptStreamAsync(cancellationToken)
 1733901                    .ConfigureAwait(false);
 902
 903                lock (_mutex)
 1423904                {
 905                    // We don't want to increment _dispatchInvocationCount/_streamInputOutputCount when the connection
 906                    // is shutting down or being disposed.
 1423907                    if (_shutdownTask is not null)
 0908                    {
 909                        // Note that cancellationToken may not be canceled yet at this point.
 0910                        throw new OperationCanceledException();
 911                    }
 912
 913                    // The logic in IncrementStreamInputOutputCount requires that we increment the dispatch-invocation
 914                    // count first.
 1423915                    IncrementDispatchInvocationCount();
 1423916                    IncrementStreamInputOutputCount(stream.IsBidirectional);
 917
 918                    // Decorate the stream to decrement the stream input/output count on Complete.
 1423919                    stream = new MultiplexedStreamDecorator(stream, DecrementStreamInputOutputCount);
 920
 921                    // The multiplexed connection guarantees that the IDs of accepted streams of a given type have ever
 922                    // increasing values.
 923
 1423924                    if (stream.IsBidirectional)
 416925                    {
 416926                        _lastRemoteBidirectionalStreamId = stream.Id;
 416927                    }
 928                    else
 1007929                    {
 1007930                        _lastRemoteUnidirectionalStreamId = stream.Id;
 1007931                    }
 1423932                }
 933
 934                // Start a task to read the stream and dispatch the request. We pass CancellationToken.None to Task.Run
 935                // because DispatchRequestAsync must clean-up the stream and the dispatch-invocation count.
 2846936                _ = Task.Run(() => DispatchRequestAsync(stream), CancellationToken.None);
 1423937            }
 0938        }
 255939        catch (OperationCanceledException)
 255940        {
 941            // Expected, the associated cancellation token source was canceled.
 255942        }
 55943        catch (IceRpcException)
 55944        {
 55945            RefuseNewInvocations("The connection was lost");
 55946            _ = _shutdownRequestedTcs.TrySetResult();
 55947            throw;
 948        }
 0949        catch (Exception exception)
 0950        {
 0951            Debug.Fail($"The accept stream task failed with an unexpected exception: {exception}");
 0952            RefuseNewInvocations("The connection was lost");
 0953            _ = _shutdownRequestedTcs.TrySetResult();
 0954            throw;
 955        }
 255956    }
 957
 958    private void CheckPeerHeaderSize(int headerSize)
 1826959    {
 1826960        if (headerSize > _maxPeerHeaderSize)
 2961        {
 2962            throw new IceRpcException(
 2963                IceRpcError.LimitExceeded,
 2964                $"The header size ({headerSize}) for an icerpc request or response is greater than the peer's max header
 965        }
 1824966    }
 967
 968    private void DecrementDispatchInvocationCount()
 2870969    {
 970        lock (_mutex)
 2870971        {
 2870972            if (--_dispatchInvocationCount == 0)
 697973            {
 697974                if (_shutdownTask is not null)
 30975                {
 30976                    _dispatchesAndInvocationsCompleted.TrySetResult();
 30977                }
 667978                else if (!_refuseInvocations && _streamInputOutputCount == 0)
 551979                {
 551980                    ScheduleInactivityCheck();
 551981                }
 697982            }
 2870983        }
 2870984    }
 985
 986    /// <summary>Decrements the stream input/output count.</summary>
 987    private void DecrementStreamInputOutputCount()
 3678988    {
 989        lock (_mutex)
 3678990        {
 3678991            if (--_streamInputOutputCount == 0)
 686992            {
 686993                if (_shutdownTask is not null)
 28994                {
 28995                    _streamsCompleted.TrySetResult();
 28996                }
 658997                else if (!_refuseInvocations && _dispatchInvocationCount == 0)
 104998                {
 999                    // We enable the inactivity check in order to complete _shutdownRequestedTcs when inactive for too
 1000                    // long. _refuseInvocations is true when the connection is either about to be "shutdown requested",
 1001                    // or shut down / disposed. We don't need to complete _shutdownRequestedTcs in any of these
 1002                    // situations.
 1041003                    ScheduleInactivityCheck();
 1041004                }
 6861005            }
 36781006        }
 36781007    }
 1008
 1009    private async Task DispatchRequestAsync(IMultiplexedStream stream)
 14231010    {
 1011        // _disposedCts is not disposed since we own a dispatch count.
 14231012        CancellationToken cancellationToken = stream.IsBidirectional ?
 14231013            stream.WritesClosed.AsCancellationToken(_disposedCts.Token) :
 14231014            _disposedCts.Token;
 1015
 14231016        PipeReader? fieldsPipeReader = null;
 1017        IDictionary<RequestFieldKey, ReadOnlySequence<byte>> fields;
 1018        IceRpcRequestHeader header;
 1019
 14231020        PipeReader? streamInput = stream.Input;
 14231021        PipeWriter? streamOutput = stream.IsBidirectional ? stream.Output : null;
 14231022        bool success = false;
 1023
 1024        try
 14231025        {
 1026            try
 14231027            {
 14231028                ReadResult readResult = await streamInput.ReadSliceSegmentAsync(
 14231029                    _maxLocalHeaderSize,
 14231030                    cancellationToken).ConfigureAwait(false);
 1031
 14191032                if (readResult.Buffer.IsEmpty)
 11033                {
 11034                    throw new IceRpcException(IceRpcError.IceRpcError, "Received icerpc request with empty header.");
 1035                }
 1036
 14181037                (header, fields, fieldsPipeReader) = DecodeHeader(readResult.Buffer);
 14171038                streamInput.AdvanceTo(readResult.Buffer.End);
 14171039            }
 41040            catch (InvalidDataException exception)
 41041            {
 41042                var rpcException = new IceRpcException(
 41043                    IceRpcError.IceRpcError,
 41044                    "Received invalid icerpc request header.",
 41045                    exception);
 1046
 41047                if (_taskExceptionObserver is null)
 11048                {
 11049                    throw rpcException;
 1050                }
 1051                else
 31052                {
 31053                    _taskExceptionObserver.DispatchRefused(
 31054                        _connectionContext!.TransportConnectionInformation,
 31055                        rpcException);
 31056                    return; // success remains false
 1057                }
 1058            }
 21059            catch (Exception exception) when (_taskExceptionObserver is not null)
 21060            {
 21061                _taskExceptionObserver.DispatchRefused(_connectionContext!.TransportConnectionInformation, exception);
 21062                return; // success remains false
 1063            }
 1064
 14171065            using var request = new IncomingRequest(Protocol.IceRpc, _connectionContext!)
 14171066            {
 14171067                Fields = fields,
 14171068                IsOneway = !stream.IsBidirectional,
 14171069                Operation = header.Operation,
 14171070                Path = header.Path,
 14171071                Payload = streamInput
 14171072            };
 1073
 14171074            streamInput = null; // the request now owns streamInput
 1075
 1076            try
 14171077            {
 14171078                OutgoingResponse response = await PerformDispatchRequestAsync(request, cancellationToken)
 14171079                    .ConfigureAwait(false);
 1080
 14081081                if (!request.IsOneway)
 4011082                {
 4011083                    Debug.Assert(streamOutput is not null);
 4011084                    EncodeHeader(response);
 1085
 3991086                    PipeWriter payloadWriter = response.GetPayloadWriter(streamOutput);
 1087
 1088                    // We give flushResult an initial "failed" value, in case the first CopyFromAsync throws.
 3991089                    var flushResult = new FlushResult(isCanceled: true, isCompleted: false);
 1090
 1091                    try
 3991092                    {
 1093                        // We don't use cancellationToken here because it's canceled shortly afterwards by the
 1094                        // completion of writesClosed. This works around https://github.com/dotnet/runtime/issues/82704
 1095                        // where the stream would otherwise be aborted after the successful write. It's also fine to
 1096                        // just use _disposedCts.Token: if writes are closed because the peer is not longer interested
 1097                        // in the response, the write operations will raise an IceRpcException(StreamAborted) which is
 1098                        // ignored.
 3991099                        bool hasContinuation = response.PayloadContinuation is not null;
 1100
 3991101                        flushResult = await payloadWriter.CopyFromAsync(
 3991102                            response.Payload,
 3991103                            stream.WritesClosed,
 3991104                            endStream: !hasContinuation,
 3991105                            _disposedCts.Token).ConfigureAwait(false);
 1106
 3951107                        if (!flushResult.IsCompleted && !flushResult.IsCanceled && hasContinuation)
 21108                        {
 21109                            flushResult = await payloadWriter.CopyFromAsync(
 21110                                response.PayloadContinuation!,
 21111                                stream.WritesClosed,
 21112                                endStream: true,
 21113                                _disposedCts.Token).ConfigureAwait(false);
 11114                        }
 3941115                    }
 1116                    finally
 3991117                    {
 3991118                        payloadWriter.CompleteOutput(success: !flushResult.IsCanceled);
 3991119                        response.Payload.Complete();
 3991120                        response.PayloadContinuation?.Complete();
 3991121                    }
 3941122                }
 14011123            }
 161124            catch (Exception exception) when (_taskExceptionObserver is not null)
 71125            {
 71126                _taskExceptionObserver.DispatchFailed(
 71127                    request,
 71128                    _connectionContext!.TransportConnectionInformation,
 71129                    exception);
 71130                return; // success remains false
 1131            }
 14011132            success = true;
 14011133        }
 11134        catch (IceRpcException)
 11135        {
 1136            // Expected, with for example:
 1137            //  - IceRpcError.ConnectionAborted when the peer aborts the connection
 1138            //  - IceRpcError.IceRpcError when the request header is invalid
 1139            //  - IceRpcError.TruncatedData when the request header is truncated
 11140        }
 91141        catch (OperationCanceledException exception) when (
 91142            exception.CancellationToken == cancellationToken ||
 91143            exception.CancellationToken == _disposedCts.Token)
 91144        {
 1145            // Expected if the dispatch is canceled by the peer or the connection is disposed.
 91146        }
 01147        catch (Exception exception)
 01148        {
 1149            // This exception is unexpected when running the IceRPC test suite. A test that expects this exception must
 1150            // install a task exception observer.
 01151            Debug.Fail($"icerpc dispatch failed with an unexpected exception: {exception}");
 1152
 1153            // Generate unobserved task exception (UTE). If this exception is expected (e.g. an expected payload read
 1154            // exception) and the application wants to avoid this UTE, it must configure a non-null logger to install
 1155            // a task exception observer.
 01156            throw;
 1157        }
 1158        finally
 14231159        {
 14231160            if (!success)
 221161            {
 1162                // We always need to complete streamOutput when an exception is thrown. For example, we received an
 1163                // invalid request header that we could not decode.
 221164                streamOutput?.CompleteOutput(success: false);
 221165                streamInput?.Complete();
 221166            }
 14231167            fieldsPipeReader?.Complete();
 1168
 14231169            DecrementDispatchInvocationCount();
 14231170        }
 1171
 1172        async Task<OutgoingResponse> PerformDispatchRequestAsync(
 1173            IncomingRequest request,
 1174            CancellationToken cancellationToken)
 14171175        {
 14171176            Debug.Assert(_dispatcher is not null);
 1177
 1178            OutgoingResponse response;
 1179
 1180            try
 14171181            {
 14171182                if (_dispatchSemaphore is SemaphoreSlim dispatchSemaphore)
 14171183                {
 14171184                    await dispatchSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
 14171185                }
 1186
 1187                try
 14171188                {
 14171189                    response = await _dispatcher.DispatchAsync(request, cancellationToken).ConfigureAwait(false);
 13971190                }
 1191                finally
 14171192                {
 14171193                    _dispatchSemaphore?.Release();
 14171194                }
 1195
 13971196                if (response != request.Response)
 11197                {
 11198                    throw new InvalidOperationException(
 11199                        "The dispatcher did not return the last response created for this request.");
 1200                }
 13961201            }
 101202            catch (OperationCanceledException exception) when (cancellationToken == exception.CancellationToken)
 91203            {
 91204                throw;
 1205            }
 121206            catch (Exception exception)
 121207            {
 121208                if (exception is not DispatchException dispatchException)
 81209                {
 81210                    StatusCode statusCode = exception switch
 81211                    {
 11212                        InvalidDataException => StatusCode.InvalidData,
 41213                        IceRpcException iceRpcException when iceRpcException.IceRpcError == IceRpcError.TruncatedData =>
 41214                            StatusCode.TruncatedPayload,
 31215                        _ => StatusCode.InternalError
 81216                    };
 81217                    dispatchException = new DispatchException(statusCode, message: null, exception);
 81218                }
 121219                response = dispatchException.ToOutgoingResponse(request);
 121220            }
 1221
 14081222            return response;
 14081223        }
 1224
 1225        static (IceRpcRequestHeader, IDictionary<RequestFieldKey, ReadOnlySequence<byte>>, PipeReader?) DecodeHeader(
 1226            ReadOnlySequence<byte> buffer)
 14181227        {
 14181228            var decoder = new SliceDecoder(buffer);
 14181229            var header = new IceRpcRequestHeader(ref decoder);
 14181230            (IDictionary<RequestFieldKey, ReadOnlySequence<byte>> fields, PipeReader? pipeReader) =
 14181231                DecodeFieldDictionary(
 14181232                    ref decoder,
 14321233                    (ref SliceDecoder decoder) => decoder.DecodeRequestFieldKey());
 1234
 14171235            return (header, fields, pipeReader);
 14171236        }
 1237
 1238        void EncodeHeader(OutgoingResponse response)
 4011239        {
 4011240            var encoder = new SliceEncoder(streamOutput);
 1241
 1242            // Write the IceRpc response header.
 4011243            Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(_headerSizeLength);
 1244
 1245            // We use UnflushedBytes because EncodeFieldDictionary can write directly to streamOutput.
 4011246            long headerStartPos = streamOutput.UnflushedBytes;
 1247
 4011248            encoder.EncodeStatusCode(response.StatusCode);
 4011249            if (response.StatusCode > StatusCode.Ok)
 341250            {
 341251                encoder.EncodeString(response.ErrorMessage!);
 341252            }
 1253
 4011254            EncodeFieldDictionary(
 4011255                response.Fields,
 51256                (ref SliceEncoder encoder, ResponseFieldKey key) => encoder.EncodeResponseFieldKey(key),
 4011257                ref encoder,
 4011258                streamOutput);
 1259
 1260            // We're done with the header encoding, write the header size.
 4001261            int headerSize = (int)(streamOutput.UnflushedBytes - headerStartPos);
 4001262            CheckPeerHeaderSize(headerSize);
 3991263            SliceEncoder.EncodeVarUInt62((uint)headerSize, sizePlaceholder);
 3991264        }
 14231265    }
 1266
 1267    /// <summary>Encodes the fields dictionary at the end of a request or response header.</summary>
 1268    /// <remarks>This method can write bytes directly to <paramref name="output"/> without going through
 1269    /// <paramref name="encoder"/>.</remarks>
 1270    private void EncodeFieldDictionary<TKey>(
 1271        IDictionary<TKey, OutgoingFieldValue> fields,
 1272        EncodeAction<TKey> encodeKeyAction,
 1273        ref SliceEncoder encoder,
 1274        PipeWriter output) where TKey : struct =>
 18271275        encoder.EncodeDictionary(
 18271276            fields,
 18271277            encodeKeyAction,
 18271278            (ref SliceEncoder encoder, OutgoingFieldValue value) =>
 191279                {
 191280                    if (value.WriteAction is Action<IBufferWriter<byte>> writeAction)
 91281                    {
 91282                        Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(_headerSizeLength);
 91283                        long startPos = output.UnflushedBytes;
 91284                        writeAction(output);
 81285                        SliceEncoder.EncodeVarUInt62((ulong)(output.UnflushedBytes - startPos), sizePlaceholder);
 81286                    }
 18271287                    else
 101288                    {
 101289                        encoder.EncodeSize(checked((int)value.ByteSequence.Length));
 101290                        encoder.WriteByteSequence(value.ByteSequence);
 101291                    }
 18451292                });
 1293
 1294    /// <summary>Increments the dispatch-invocation count.</summary>
 1295    /// <remarks>This method must be called with _mutex locked.</remarks>
 1296    private void IncrementDispatchInvocationCount()
 28701297    {
 28701298        if (_dispatchInvocationCount++ == 0 && _streamInputOutputCount == 0)
 6971299        {
 1300            // Cancel inactivity check.
 6971301            _inactivityTimeoutTimer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
 6971302        }
 28701303    }
 1304
 1305    /// <summary>Increments the stream input/output count.</summary>
 1306    /// <remarks>This method must be called with _mutex locked.</remarks>
 1307    private void IncrementStreamInputOutputCount(bool bidirectional)
 28491308    {
 28491309        Debug.Assert(_dispatchInvocationCount > 0);
 28491310        _streamInputOutputCount += bidirectional ? 2 : 1;
 28491311    }
 1312
 1313    private async Task ReadGoAwayAsync(CancellationToken cancellationToken)
 3101314    {
 3101315        await Task.Yield(); // exit mutex lock
 1316
 1317        // Wait for _connectTask (which spawned the task running this method) to complete. This await can't fail.
 1318        // This guarantees this method won't request a shutdown until after _connectTask completed successfully.
 3101319        await _connectTask!.ConfigureAwait(false);
 1320
 3101321        PipeReader remoteInput = _remoteControlStream!.Input!;
 1322
 1323        try
 3101324        {
 1325            // Wait to receive the GoAway frame.
 3101326            await ReceiveControlFrameHeaderAsync(IceRpcControlFrameType.GoAway, cancellationToken)
 3101327                .ConfigureAwait(false);
 1328
 891329            ReadResult readResult = await remoteInput.ReadSliceSegmentAsync(
 891330                MaxGoAwayFrameBodySize,
 891331                cancellationToken).ConfigureAwait(false);
 1332
 1333            // We don't call CancelPendingRead on remoteInput
 871334            Debug.Assert(!readResult.IsCanceled);
 1335
 1336            try
 871337            {
 871338                _goAwayFrame =
 1741339                    readResult.Buffer.DecodeSliceBuffer((ref SliceDecoder decoder) => new IceRpcGoAway(ref decoder));
 861340            }
 1341            finally
 871342            {
 871343                remoteInput.AdvanceTo(readResult.Buffer.End);
 871344            }
 1345
 861346            RefuseNewInvocations("The connection was shut down because it received a GoAway frame from the peer.");
 861347            _goAwayCts.Cancel();
 861348            _ = _shutdownRequestedTcs.TrySetResult();
 861349        }
 1601350        catch (OperationCanceledException)
 1601351        {
 1352            // The connection is disposed and we let this exception cancel the task.
 1601353            throw;
 1354        }
 591355        catch (IceRpcException)
 591356        {
 1357            // We let the task complete with this expected exception.
 591358            throw;
 1359        }
 51360        catch (InvalidDataException exception)
 51361        {
 1362            // "expected" in the sense it should not trigger a Debug.Fail.
 51363            throw new IceRpcException(
 51364                IceRpcError.IceRpcError,
 51365                "The ReadGoAway task was aborted by an icerpc protocol error.",
 51366                exception);
 1367        }
 01368        catch (Exception exception)
 01369        {
 01370            Debug.Fail($"The read go away task failed with an unexpected exception: {exception}");
 01371            throw;
 1372        }
 861373    }
 1374
 1375    private async ValueTask ReceiveControlFrameHeaderAsync(
 1376        IceRpcControlFrameType expectedFrameType,
 1377        CancellationToken cancellationToken)
 6321378    {
 6321379        ReadResult readResult = await _remoteControlStream!.Input.ReadAsync(cancellationToken).ConfigureAwait(false);
 1380
 1381        // We don't call CancelPendingRead on _remoteControlStream.Input.
 4081382        Debug.Assert(!readResult.IsCanceled);
 1383
 4081384        if (readResult.Buffer.IsEmpty)
 11385        {
 11386            throw new InvalidDataException(
 11387                "Failed to read the frame type because no more data is available from the control stream.");
 1388        }
 1389
 4071390        var frameType = (IceRpcControlFrameType)readResult.Buffer.FirstSpan[0];
 4071391        if (frameType != expectedFrameType)
 31392        {
 31393            throw new InvalidDataException($"Received frame type {frameType} but expected {expectedFrameType}.");
 1394        }
 4041395        _remoteControlStream!.Input.AdvanceTo(readResult.Buffer.GetPosition(1));
 4041396    }
 1397
 1398    private async ValueTask ReceiveSettingsFrameBody(CancellationToken cancellationToken)
 3151399    {
 1400        // We are still in the single-threaded initialization at this point.
 1401
 3151402        PipeReader input = _remoteControlStream!.Input;
 3151403        ReadResult readResult = await input.ReadSliceSegmentAsync(
 3151404            MaxSettingsFrameBodySize,
 3151405            cancellationToken).ConfigureAwait(false);
 1406
 1407        // We don't call CancelPendingRead on _remoteControlStream.Input
 3141408        Debug.Assert(!readResult.IsCanceled);
 1409
 1410        try
 3141411        {
 3141412            IceRpcSettings settings =
 6281413                readResult.Buffer.DecodeSliceBuffer((ref SliceDecoder decoder) => new IceRpcSettings(ref decoder));
 1414
 3111415            if (settings.Value.TryGetValue(IceRpcSettingKey.MaxHeaderSize, out ulong value))
 31416            {
 1417                // a varuint62 always fits in a long
 1418                try
 31419                {
 31420                    _maxPeerHeaderSize = ConnectionOptions.IceRpcCheckMaxHeaderSize((long)value);
 21421                }
 11422                catch (ArgumentOutOfRangeException exception)
 11423                {
 11424                    throw new InvalidDataException("Received invalid maximum header size setting.", exception);
 1425                }
 21426                _headerSizeLength = SliceEncoder.GetVarUInt62EncodedSize(value);
 21427            }
 1428            // all other settings are unknown and ignored
 3101429        }
 1430        finally
 3141431        {
 3141432            input.AdvanceTo(readResult.Buffer.End);
 3141433        }
 3101434    }
 1435
 1436    private void RefuseNewInvocations(string message)
 6311437    {
 1438        lock (_mutex)
 6311439        {
 6311440            _refuseInvocations = true;
 6311441            _invocationRefusedMessage ??= message;
 6311442        }
 6311443    }
 1444
 1445    // The inactivity check executes once in _inactivityTimeout. By then either:
 1446    // - the connection is no longer inactive (and the inactivity check is canceled or being canceled)
 1447    // - the connection is still inactive and we request shutdown
 1448    private void ScheduleInactivityCheck() =>
 9651449        _inactivityTimeoutTimer.Change(_inactivityTimeout, Timeout.InfiniteTimeSpan);
 1450
 1451    private ValueTask<FlushResult> SendControlFrameAsync(
 1452        IceRpcControlFrameType frameType,
 1453        EncodeAction encodeAction,
 1454        CancellationToken cancellationToken)
 4331455    {
 4331456        PipeWriter output = _controlStream!.Output;
 1457
 4331458        EncodeFrame(output);
 1459
 4331460        return output.FlushAsync(cancellationToken); // Flush
 1461
 1462        void EncodeFrame(IBufferWriter<byte> buffer)
 4331463        {
 4331464            var encoder = new SliceEncoder(buffer);
 4331465            encoder.EncodeIceRpcControlFrameType(frameType);
 4331466            Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(_headerSizeLength);
 4331467            int startPos = encoder.EncodedByteCount; // does not include the size
 4331468            encodeAction.Invoke(ref encoder);
 4331469            int frameSize = encoder.EncodedByteCount - startPos;
 4331470            SliceEncoder.EncodeVarUInt62((uint)frameSize, sizePlaceholder);
 4331471        }
 4331472    }
 1473
 1474    /// <summary>Sends the payload continuation of an outgoing request in the background.</summary>
 1475    /// <remarks>We send the payload continuation on a separate thread with Task.Run: this ensures that the synchronous
 1476    /// activity that could result from reading or writing the payload continuation doesn't delay in any way the
 1477    /// caller. </remarks>
 1478    /// <param name="request">The outgoing request.</param>
 1479    /// <param name="payloadWriter">The payload writer.</param>
 1480    /// <param name="writesClosed">A task that completes when we can no longer write to payloadWriter.</param>
 1481    /// <param name="onGoAway">An action to execute with a CTS when we receive the GoAway frame from the peer.</param>
 1482    /// <param name="cancellationToken">The cancellation token of the invocation; the associated CTS is disposed when
 1483    /// the invocation completes.</param>
 1484    private void SendRequestPayloadContinuation(
 1485        OutgoingRequest request,
 1486        PipeWriter payloadWriter,
 1487        Task writesClosed,
 1488        Action<object?> onGoAway,
 1489        CancellationToken cancellationToken)
 121490    {
 121491        Debug.Assert(request.PayloadContinuation is not null);
 1492
 1493        // First "detach" the continuation.
 121494        PipeReader payloadContinuation = request.PayloadContinuation;
 121495        request.PayloadContinuation = null;
 1496
 1497        lock (_mutex)
 121498        {
 121499            Debug.Assert(_dispatchInvocationCount > 0); // as a result, can't be disposed.
 1500
 1501            // Give the task its own dispatch-invocation count. This ensures the transport connection won't be disposed
 1502            // while the continuation is being sent.
 121503            IncrementDispatchInvocationCount();
 121504        }
 1505
 1506        // This background task owns payloadContinuation, payloadWriter and 1 dispatch-invocation count, and must clean
 1507        // them up. Hence CancellationToken.None.
 121508        _ = Task.Run(PerformSendRequestPayloadContinuationAsync, CancellationToken.None);
 1509
 1510        async Task PerformSendRequestPayloadContinuationAsync()
 121511        {
 121512            bool success = false;
 1513
 1514            try
 121515            {
 1516                // Since _dispatchInvocationCount > 0, _disposedCts is not disposed.
 121517                using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _disposedCts.Token);
 1518
 1519                // This token registration is needed for one-way requests and is redundant for two-way requests.
 1520                // We want GoAway to cancel the sending of one-way requests that have not been received by the peer,
 1521                // especially when these requests have payload continuations.
 121522                using CancellationTokenRegistration tokenRegistration = _goAwayCts.Token.UnsafeRegister(onGoAway, cts);
 1523
 1524                try
 121525                {
 1526                    // The cancellation of the InvokeAsync's cancellationToken cancels cts only until InvokeAsync's
 1527                    // PerformInvokeAsync completes. Afterwards, the cancellation of InvokeAsync's cancellationToken has
 1528                    // no effect on cts, so it doesn't cancel the copying of payloadContinuation.
 121529                    FlushResult flushResult = await payloadWriter.CopyFromAsync(
 121530                        payloadContinuation,
 121531                        writesClosed,
 121532                        endStream: true,
 121533                        cts.Token).ConfigureAwait(false);
 1534
 51535                    success = !flushResult.IsCanceled;
 51536                }
 31537                catch (OperationCanceledException exception) when (exception.CancellationToken == cts.Token)
 21538                {
 1539                    // Process/translate this exception primarily for the benefit of _taskExceptionObserver.
 1540
 1541                    // Can be because cancellationToken was canceled by DisposeAsync or GoAway; that's fine.
 21542                    cancellationToken.ThrowIfCancellationRequested();
 1543
 11544                    if (_disposedCts.IsCancellationRequested)
 01545                    {
 1546                        // DisposeAsync aborted the request.
 01547                        throw new IceRpcException(IceRpcError.OperationAborted);
 1548                    }
 1549                    else
 11550                    {
 1551                        // When _goAwayCts is canceled and onGoAway cancels its argument:
 1552                        // - if PerformInvokeAsync is no longer running (typical for a one-way request), we get here
 1553                        // - if PerformInvokeAsync is still running, we may get here or cancellationToken gets canceled
 1554                        // first.
 11555                        Debug.Assert(_goAwayCts.IsCancellationRequested);
 11556                        throw new IceRpcException(IceRpcError.InvocationCanceled, "The connection is shutting down.");
 1557                    }
 1558                }
 51559            }
 71560            catch (Exception exception) when (_taskExceptionObserver is not null)
 51561            {
 51562                _taskExceptionObserver.RequestPayloadContinuationFailed(
 51563                    request,
 51564                    _connectionContext!.TransportConnectionInformation,
 51565                    exception);
 51566            }
 11567            catch (OperationCanceledException exception) when (exception.CancellationToken == cancellationToken)
 11568            {
 1569                // Expected.
 11570            }
 11571            catch (IceRpcException)
 11572            {
 1573                // Expected, with for example IceRpcError.ConnectionAborted when the peer aborts the connection.
 11574            }
 01575            catch (Exception exception)
 01576            {
 1577                // This exception is unexpected when running the IceRPC test suite. A test that expects such an
 1578                // exception must install a task exception observer.
 01579                Debug.Fail($"Failed to send payload continuation of request {request}: {exception}");
 1580
 1581                // If Debug is not enabled and there is no task exception observer, we rethrow to generate an
 1582                // Unobserved Task Exception.
 01583                throw;
 1584            }
 1585            finally
 121586            {
 121587                payloadWriter.CompleteOutput(success);
 121588                payloadContinuation.Complete();
 121589                DecrementDispatchInvocationCount();
 121590            }
 121591        }
 121592    }
 1593}

Methods/Properties

get_IsServer()
.ctor(IceRpc.Transports.IMultiplexedConnection,IceRpc.Transports.TransportConnectionInformation,IceRpc.ConnectionOptions,IceRpc.Internal.ITaskExceptionObserver)
ConnectAsync(System.Threading.CancellationToken)
PerformConnectAsync()
DisposeAsync()
PerformDisposeAsync()
InvokeAsync(IceRpc.OutgoingRequest,System.Threading.CancellationToken)
PerformInvokeAsync()
OnGoAway()
DecodeHeader()
EncodeHeader()
ShutdownAsync(System.Threading.CancellationToken)
PerformShutdownAsync()
DecodeFieldDictionary(ZeroC.Slice.Codec.SliceDecoder&,ZeroC.Slice.Codec.DecodeFunc`1<TKey>)
AcceptRequestsAsync()
CheckPeerHeaderSize(System.Int32)
DecrementDispatchInvocationCount()
DecrementStreamInputOutputCount()
DispatchRequestAsync()
PerformDispatchRequestAsync()
DecodeHeader()
EncodeHeader()
EncodeFieldDictionary(System.Collections.Generic.IDictionary`2<TKey,IceRpc.OutgoingFieldValue>,ZeroC.Slice.Codec.EncodeAction`1<TKey>,ZeroC.Slice.Codec.SliceEncoder&,System.IO.Pipelines.PipeWriter)
IncrementDispatchInvocationCount()
IncrementStreamInputOutputCount(System.Boolean)
ReadGoAwayAsync()
ReceiveControlFrameHeaderAsync()
ReceiveSettingsFrameBody()
RefuseNewInvocations(System.String)
ScheduleInactivityCheck()
SendControlFrameAsync(IceRpc.Internal.IceRpcControlFrameType,ZeroC.Slice.Codec.EncodeAction,System.Threading.CancellationToken)
EncodeFrame()
SendRequestPayloadContinuation(IceRpc.OutgoingRequest,System.IO.Pipelines.PipeWriter,System.Threading.Tasks.Task,System.Action`1<System.Object>,System.Threading.CancellationToken)
PerformSendRequestPayloadContinuationAsync()