< Summary

Information
Class: IceRpc.Internal.IceProtocolConnection
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Internal/IceProtocolConnection.cs
Tag: 1856_27024993493
Line coverage
89%
Covered lines: 845
Uncovered lines: 98
Coverable lines: 943
Total lines: 1585
Line coverage: 89.6%
Branch coverage
83%
Covered branches: 205
Total branches: 246
Branch coverage: 83.3%
Method coverage
100%
Covered methods: 37
Fully covered methods: 18
Total methods: 37
Method coverage: 100%
Full method coverage: 48.6%

Metrics

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Internal/IceProtocolConnection.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Ice.Codec;
 4using IceRpc.Transports;
 5using IceRpc.Transports.Internal;
 6using System.Buffers;
 7using System.Collections.Immutable;
 8using System.Diagnostics;
 9using System.IO.Pipelines;
 10using System.Security.Authentication;
 11
 12namespace IceRpc.Internal;
 13
 14/// <summary>Implements <see cref="IProtocolConnection" /> for the ice protocol.</summary>
 15internal sealed class IceProtocolConnection : IProtocolConnection
 16{
 117    private static readonly IDictionary<RequestFieldKey, ReadOnlySequence<byte>> _idempotentFields =
 118        new Dictionary<RequestFieldKey, ReadOnlySequence<byte>>
 119        {
 120            [RequestFieldKey.Idempotent] = default
 121        }.ToImmutableDictionary();
 22
 21423    private bool IsServer => _transportConnectionInformation is not null;
 24
 25    private IConnectionContext? _connectionContext; // non-null once the connection is established
 26    private Task? _connectTask;
 27    private readonly IDispatcher _dispatcher;
 28
 29    // The number of outstanding dispatches and invocations.
 30    private int _dispatchInvocationCount;
 31
 32    // We don't want the continuation to run from the dispatch or invocation thread.
 22533    private readonly TaskCompletionSource _dispatchesAndInvocationsCompleted =
 22534        new(TaskCreationOptions.RunContinuationsAsynchronously);
 35
 36    private readonly SemaphoreSlim? _dispatchSemaphore;
 37
 38    // This cancellation token source is canceled when the connection is disposed.
 22539    private readonly CancellationTokenSource _disposedCts = new();
 40
 41    private Task? _disposeTask;
 42    private readonly IDuplexConnection _duplexConnection;
 43    private readonly DuplexConnectionReader _duplexConnectionReader;
 44    private readonly IceDuplexConnectionWriter _duplexConnectionWriter;
 22545    private bool _heartbeatEnabled = true;
 22546    private Task _heartbeatTask = Task.CompletedTask;
 47    private readonly TimeSpan _inactivityTimeout;
 48    private readonly Timer _inactivityTimeoutTimer;
 49    private string? _invocationRefusedMessage;
 50    private int _lastRequestId;
 51    private readonly int _maxFrameSize;
 22552    private readonly Lock _mutex = new();
 53    private readonly PipeOptions _pipeOptions;
 54    private Task? _readFramesTask;
 55
 56    // A connection refuses invocations when it's disposed, shut down, shutting down or merely "shutdown requested".
 57    private bool _refuseInvocations;
 58
 59    // Does ShutdownAsync send a close connection frame?
 22560    private bool _sendCloseConnectionFrame = true;
 61
 62    private Task? _shutdownTask;
 63
 64    // The thread that completes this TCS can run the continuations, and as a result its result must be set without
 65    // holding a lock on _mutex.
 22566    private readonly TaskCompletionSource _shutdownRequestedTcs = new();
 67
 68    // Only set for server connections.
 69    private readonly TransportConnectionInformation? _transportConnectionInformation;
 70
 71    private readonly CancellationTokenSource _twowayDispatchesCts;
 22572    private readonly Dictionary<int, TaskCompletionSource<PipeReader>> _twowayInvocations = new();
 73
 74    private Exception? _writeException; // protected by _writeSemaphore
 22575    private readonly SemaphoreSlim _writeSemaphore = new(1, 1);
 76
 77    public Task<(TransportConnectionInformation ConnectionInformation, Task ShutdownRequested)> ConnectAsync(
 78        CancellationToken cancellationToken)
 22479    {
 80        Task<(TransportConnectionInformation ConnectionInformation, Task ShutdownRequested)> result;
 81        lock (_mutex)
 22482        {
 22483            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 84
 22285            if (_connectTask is not null)
 086            {
 087                throw new InvalidOperationException("Cannot call connect more than once.");
 88            }
 89
 22290            result = PerformConnectAsync();
 22291            _connectTask = result;
 22292        }
 22293        return result;
 94
 95        async Task<(TransportConnectionInformation ConnectionInformation, Task ShutdownRequested)> PerformConnectAsync()
 22296        {
 97            // Make sure we execute the function without holding the connection mutex lock.
 22298            await Task.Yield();
 99
 100            // _disposedCts is not disposed at this point because DisposeAsync waits for the completion of _connectTask.
 222101            using var connectCts = CancellationTokenSource.CreateLinkedTokenSource(
 222102                cancellationToken,
 222103                _disposedCts.Token);
 104
 105            TransportConnectionInformation transportConnectionInformation;
 106
 107            try
 222108            {
 109                // If the transport connection information is null, we need to connect the transport connection. It's
 110                // null for client connections. The transport connection of a server connection is established by
 111                // Server.
 222112                transportConnectionInformation = _transportConnectionInformation ??
 222113                    await _duplexConnection.ConnectAsync(connectCts.Token).ConfigureAwait(false);
 114
 214115                if (IsServer)
 104116                {
 117                    // Send ValidateConnection frame.
 104118                    await SendControlFrameAsync(EncodeValidateConnectionFrame, connectCts.Token).ConfigureAwait(false);
 119
 120                    // The SendControlFrameAsync is a "write" that schedules a heartbeat when the idle timeout is not
 121                    // infinite. So no need to call ScheduleHeartbeat.
 102122                }
 123                else
 110124                {
 110125                    ReadOnlySequence<byte> buffer = await _duplexConnectionReader.ReadAtLeastAsync(
 110126                        IceDefinitions.PrologueSize,
 110127                        connectCts.Token).ConfigureAwait(false);
 128
 100129                    (IcePrologue validateConnectionFrame, long consumed) = DecodeValidateConnectionFrame(buffer);
 100130                    _duplexConnectionReader.AdvanceTo(buffer.GetPosition(consumed), buffer.End);
 131
 100132                    IceDefinitions.CheckPrologue(validateConnectionFrame);
 99133                    if (validateConnectionFrame.FrameSize != IceDefinitions.PrologueSize)
 0134                    {
 0135                        throw new InvalidDataException(
 0136                            $"Received ice frame with only '{validateConnectionFrame.FrameSize}' bytes.");
 137                    }
 99138                    if (validateConnectionFrame.FrameType != IceFrameType.ValidateConnection)
 0139                    {
 0140                        throw new InvalidDataException(
 0141                            $"Expected '{nameof(IceFrameType.ValidateConnection)}' frame but received frame type '{valid
 142                    }
 143
 144                    // The client connection is now connected, so we schedule the first heartbeat.
 99145                    if (_duplexConnection is IceDuplexConnectionDecorator decorator)
 99146                    {
 99147                        decorator.ScheduleHeartbeat();
 99148                    }
 99149                }
 201150            }
 11151            catch (OperationCanceledException)
 11152            {
 11153                cancellationToken.ThrowIfCancellationRequested();
 154
 5155                Debug.Assert(_disposedCts.Token.IsCancellationRequested);
 5156                throw new IceRpcException(
 5157                    IceRpcError.OperationAborted,
 5158                    "The connection establishment was aborted because the connection was disposed.");
 159            }
 1160            catch (InvalidDataException exception)
 1161            {
 1162                throw new IceRpcException(
 1163                    IceRpcError.ConnectionAborted,
 1164                    "The connection was aborted by an ice protocol error.",
 1165                    exception);
 166            }
 1167            catch (AuthenticationException)
 1168            {
 1169                throw;
 170            }
 8171            catch (IceRpcException)
 8172            {
 8173                throw;
 174            }
 0175            catch (Exception exception)
 0176            {
 0177                Debug.Fail($"ConnectAsync failed with an unexpected exception: {exception}");
 0178                throw;
 179            }
 180
 181            // We assign _readFramesTask with _mutex locked to make sure this assignment occurs before the start of
 182            // DisposeAsync. Once _disposeTask is not null, _readFramesTask is immutable.
 183            lock (_mutex)
 201184            {
 201185                if (_disposeTask is not null)
 0186                {
 0187                    throw new IceRpcException(
 0188                        IceRpcError.OperationAborted,
 0189                        "The connection establishment was aborted because the connection was disposed.");
 190                }
 191
 192                // This needs to be set before starting the read frames task below.
 201193                _connectionContext = new ConnectionContext(this, transportConnectionInformation);
 194
 201195                _readFramesTask = ReadFramesAsync(_disposedCts.Token);
 201196            }
 197
 198            // The _readFramesTask waits for this PerformConnectAsync completion before reading anything. As soon as
 199            // it receives a request, it will cancel this inactivity check.
 201200            ScheduleInactivityCheck();
 201
 201202            return (transportConnectionInformation, _shutdownRequestedTcs.Task);
 203
 204            static void EncodeValidateConnectionFrame(IBufferWriter<byte> writer)
 104205            {
 104206                var encoder = new IceEncoder(writer);
 104207                IceDefinitions.ValidateConnectionFrame.Encode(ref encoder);
 104208            }
 209
 210            static (IcePrologue, long) DecodeValidateConnectionFrame(ReadOnlySequence<byte> buffer)
 100211            {
 100212                var decoder = new IceDecoder(buffer);
 100213                return (new IcePrologue(ref decoder), decoder.Consumed);
 100214            }
 201215        }
 222216    }
 217
 218    public ValueTask DisposeAsync()
 245219    {
 220        lock (_mutex)
 245221        {
 245222            if (_disposeTask is null)
 225223            {
 225224                RefuseNewInvocations("The connection was disposed.");
 225
 225226                _shutdownTask ??= Task.CompletedTask;
 225227                if (_dispatchInvocationCount == 0)
 216228                {
 216229                    _dispatchesAndInvocationsCompleted.TrySetResult();
 216230                }
 231
 225232                _heartbeatEnabled = false; // makes _heartbeatTask immutable
 233
 225234                _disposeTask = PerformDisposeAsync();
 225235            }
 245236        }
 245237        return new(_disposeTask);
 238
 239        async Task PerformDisposeAsync()
 225240        {
 241            // Make sure we execute the code below without holding the mutex lock.
 225242            await Task.Yield();
 243
 225244            _disposedCts.Cancel();
 245
 246            // We don't lock _mutex since once _disposeTask is not null, _connectTask etc are immutable.
 247
 225248            if (_connectTask is not null)
 222249            {
 250                // Wait for all writes to complete. This can't take forever since all writes are canceled by
 251                // _disposedCts.Token.
 222252                await _writeSemaphore.WaitAsync().ConfigureAwait(false);
 253
 254                try
 222255                {
 222256                    await Task.WhenAll(
 222257                        _connectTask,
 222258                        _readFramesTask ?? Task.CompletedTask,
 222259                        _heartbeatTask,
 222260                        _dispatchesAndInvocationsCompleted.Task,
 222261                        _shutdownTask).ConfigureAwait(false);
 138262                }
 84263                catch
 84264                {
 265                    // Expected if any of these tasks failed or was canceled. Each task takes care of handling
 266                    // unexpected exceptions so there's no need to handle them here.
 84267                }
 222268            }
 269
 225270            _duplexConnection.Dispose();
 271
 272            // It's safe to dispose the reader/writer since no more threads are sending/receiving data.
 225273            _duplexConnectionReader.Dispose();
 225274            _duplexConnectionWriter.Dispose();
 275
 225276            _disposedCts.Dispose();
 225277            _twowayDispatchesCts.Dispose();
 278
 225279            _dispatchSemaphore?.Dispose();
 225280            _writeSemaphore.Dispose();
 225281            await _inactivityTimeoutTimer.DisposeAsync().ConfigureAwait(false);
 225282        }
 245283    }
 284
 285    public Task<IncomingResponse> InvokeAsync(OutgoingRequest request, CancellationToken cancellationToken = default)
 1398286    {
 1398287        if (request.Protocol != Protocol.Ice)
 1288        {
 1289            throw new InvalidOperationException(
 1290                $"Cannot send {request.Protocol} request on {Protocol.Ice} connection.");
 291        }
 292
 293        lock (_mutex)
 1397294        {
 1397295            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 296
 1396297            if (_refuseInvocations)
 1298            {
 1299                throw new IceRpcException(IceRpcError.InvocationRefused, _invocationRefusedMessage);
 300            }
 1395301            if (_connectTask is null || !_connectTask.IsCompletedSuccessfully)
 0302            {
 0303                throw new InvalidOperationException("Cannot invoke on a connection that is not fully established.");
 304            }
 305
 1395306            IncrementDispatchInvocationCount();
 1395307        }
 308
 1395309        return PerformInvokeAsync();
 310
 311        async Task<IncomingResponse> PerformInvokeAsync()
 1395312        {
 313            // Since _dispatchInvocationCount > 0, _disposedCts is not disposed.
 1395314            using var invocationCts =
 1395315                CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token, cancellationToken);
 316
 1395317            PipeReader? frameReader = null;
 1395318            TaskCompletionSource<PipeReader>? responseCompletionSource = null;
 1395319            int requestId = 0;
 320
 321            try
 1395322            {
 323                // Read the full payload. This can take some time so this needs to be done before acquiring the write
 324                // semaphore.
 1395325                ReadOnlySequence<byte> payloadBuffer = await ReadFullPayloadAsync(request.Payload, invocationCts.Token)
 1395326                    .ConfigureAwait(false);
 327
 328                try
 1395329                {
 330                    // Wait for the writing of other frames to complete.
 1395331                    using SemaphoreLock _ = await AcquireWriteLockAsync(invocationCts.Token).ConfigureAwait(false);
 332
 333                    // Assign the request ID for two-way invocations and keep track of the invocation for receiving the
 334                    // response. The request ID is only assigned once the write semaphore is acquired. We don't want a
 335                    // canceled request to allocate a request ID that won't be used.
 336                    lock (_mutex)
 1395337                    {
 1395338                        if (_refuseInvocations)
 0339                        {
 340                            // It's InvocationCanceled and not InvocationRefused because we've read the payload.
 0341                            throw new IceRpcException(IceRpcError.InvocationCanceled, _invocationRefusedMessage);
 342                        }
 343
 1395344                        if (!request.IsOneway)
 389345                        {
 346                            // wrap around back to 1 if we reach int.MaxValue. 0 means one-way.
 389347                            _lastRequestId = _lastRequestId == int.MaxValue ? 1 : _lastRequestId + 1;
 389348                            requestId = _lastRequestId;
 349
 350                            // RunContinuationsAsynchronously because we don't want the "read frames loop" to run the
 351                            // continuation.
 389352                            responseCompletionSource = new(TaskCreationOptions.RunContinuationsAsynchronously);
 389353                            _twowayInvocations[requestId] = responseCompletionSource;
 389354                        }
 1395355                    }
 356
 1395357                    int payloadSize = checked((int)payloadBuffer.Length);
 358
 359                    try
 1395360                    {
 1395361                        EncodeRequestHeader(_duplexConnectionWriter, request, requestId, payloadSize);
 362
 363                        // We write to the duplex connection with _disposedCts.Token instead of invocationCts.Token.
 364                        // Canceling this write operation is fatal to the connection.
 1395365                        await _duplexConnectionWriter.WriteAsync(payloadBuffer, _disposedCts.Token)
 1395366                            .ConfigureAwait(false);
 1394367                    }
 1368                    catch (Exception exception)
 1369                    {
 1370                        WriteFailed(exception);
 1371                        throw;
 372                    }
 1394373                }
 1374                catch (IceRpcException exception) when (exception.IceRpcError != IceRpcError.InvocationCanceled)
 1375                {
 376                    // Since we could not send the request, the server cannot dispatch it and it's safe to retry.
 377                    // This includes the situation where await AcquireWriteLockAsync throws because a previous write
 378                    // failed.
 1379                    throw new IceRpcException(
 1380                        IceRpcError.InvocationCanceled,
 1381                        "Failed to send ice request.",
 1382                        exception);
 383                }
 384                finally
 1395385                {
 386                    // We've read the payload (see ReadFullPayloadAsync) and we are now done with it.
 1395387                    request.Payload.Complete();
 1395388                }
 389
 1394390                if (request.IsOneway)
 1006391                {
 392                    // We're done, there's no response for one-way requests.
 1006393                    return new IncomingResponse(request, _connectionContext!);
 394                }
 395
 396                // Wait to receive the response.
 388397                Debug.Assert(responseCompletionSource is not null);
 388398                frameReader = await responseCompletionSource.Task.WaitAsync(invocationCts.Token).ConfigureAwait(false);
 399
 369400                if (!frameReader.TryRead(out ReadResult readResult))
 0401                {
 0402                    throw new InvalidDataException($"Received empty response frame for request with id '{requestId}'.");
 403                }
 404
 369405                Debug.Assert(readResult.IsCompleted);
 406
 369407                (StatusCode statusCode, string? errorMessage, SequencePosition consumed) =
 369408                    DecodeResponseHeader(readResult.Buffer, requestId);
 409
 369410                frameReader.AdvanceTo(consumed);
 411
 369412                var response = new IncomingResponse(
 369413                    request,
 369414                    _connectionContext!,
 369415                    statusCode,
 369416                    errorMessage)
 369417                {
 369418                    Payload = frameReader
 369419                };
 420
 369421                frameReader = null; // response now owns frameReader
 369422                return response;
 423            }
 9424            catch (OperationCanceledException)
 9425            {
 9426                cancellationToken.ThrowIfCancellationRequested();
 427
 3428                Debug.Assert(_disposedCts.Token.IsCancellationRequested);
 3429                throw new IceRpcException(
 3430                    IceRpcError.OperationAborted,
 3431                    "The invocation was aborted because the connection was disposed.");
 432            }
 433            finally
 1395434            {
 435                // If responseCompletionSource is not completed, we want to complete it to prevent another method from
 436                // setting an unobservable exception in it. And if it's already completed with an exception, we observe
 437                // this exception.
 1395438                if (responseCompletionSource is not null &&
 1395439                    !responseCompletionSource.TrySetResult(InvalidPipeReader.Instance))
 379440                {
 441                    try
 379442                    {
 379443                        _ = await responseCompletionSource.Task.ConfigureAwait(false);
 369444                    }
 10445                    catch
 10446                    {
 447                        // observe exception, if any
 10448                    }
 379449                }
 450
 451                lock (_mutex)
 1395452                {
 453                    // Unregister the two-way invocation if registered.
 1395454                    if (requestId > 0 && !_refuseInvocations)
 368455                    {
 368456                        _twowayInvocations.Remove(requestId);
 368457                    }
 458
 1395459                    DecrementDispatchInvocationCount();
 1395460                }
 461
 1395462                frameReader?.Complete();
 1395463            }
 0464        }
 2770465    }
 466
 467    public Task ShutdownAsync(CancellationToken cancellationToken = default)
 69468    {
 469        lock (_mutex)
 69470        {
 69471            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 472
 67473            if (_shutdownTask is not null)
 0474            {
 0475                throw new InvalidOperationException("Cannot call ShutdownAsync more than once.");
 476            }
 67477            if (_connectTask is null || !_connectTask.IsCompletedSuccessfully)
 3478            {
 3479                throw new InvalidOperationException("Cannot shut down a protocol connection before it's connected.");
 480            }
 481
 64482            RefuseNewInvocations("The connection was shut down.");
 483
 64484            if (_dispatchInvocationCount == 0)
 51485            {
 51486                _dispatchesAndInvocationsCompleted.TrySetResult();
 51487            }
 64488            _shutdownTask = PerformShutdownAsync(_sendCloseConnectionFrame);
 64489        }
 490
 64491        return _shutdownTask;
 492
 493        async Task PerformShutdownAsync(bool sendCloseConnectionFrame)
 64494        {
 64495            await Task.Yield(); // exit mutex lock
 496
 497            try
 64498            {
 64499                Debug.Assert(_readFramesTask is not null);
 500
 501                // Since DisposeAsync waits for the _shutdownTask completion, _disposedCts is not disposed at this
 502                // point.
 64503                using var shutdownCts = CancellationTokenSource.CreateLinkedTokenSource(
 64504                    cancellationToken,
 64505                    _disposedCts.Token);
 506
 507                // Wait for dispatches and invocations to complete.
 64508                await _dispatchesAndInvocationsCompleted.Task.WaitAsync(shutdownCts.Token).ConfigureAwait(false);
 509
 510                // Stops sending heartbeats. We can't do earlier: while we're waiting for dispatches and invocations to
 511                // complete, we need to keep sending heartbeats otherwise the peer could see the connection as idle and
 512                // abort it.
 513                lock (_mutex)
 60514                {
 60515                    _heartbeatEnabled = false; // makes _heartbeatTask immutable
 60516                }
 517
 518                // Wait for the last send heartbeat to complete before sending the CloseConnection frame or disposing
 519                // the duplex connection. _heartbeatTask is immutable once _shutdownTask set. _heartbeatTask can be
 520                // canceled by DisposeAsync.
 60521                await _heartbeatTask.WaitAsync(shutdownCts.Token).ConfigureAwait(false);
 522
 60523                if (sendCloseConnectionFrame)
 27524                {
 525                    // Send CloseConnection frame.
 27526                    await SendControlFrameAsync(EncodeCloseConnectionFrame, shutdownCts.Token).ConfigureAwait(false);
 527
 528                    // Wait for the peer to abort the connection as an acknowledgment for this CloseConnection frame.
 529                    // The peer can also send us a CloseConnection frame if it started shutting down at the same time.
 530                    // We can't just return and dispose the duplex connection since the peer can still be reading frames
 531                    // (including the CloseConnection frame) and we don't want to abort this reading.
 25532                    await _readFramesTask.WaitAsync(shutdownCts.Token).ConfigureAwait(false);
 24533                }
 534                else
 33535                {
 536                    // _readFramesTask should be already completed or nearly completed.
 33537                    await _readFramesTask.WaitAsync(shutdownCts.Token).ConfigureAwait(false);
 538
 539                    // _readFramesTask succeeded means the peer is waiting for us to abort the duplex connection;
 540                    // we oblige.
 23541                    _duplexConnection.Dispose();
 23542                }
 47543            }
 6544            catch (OperationCanceledException)
 6545            {
 6546                cancellationToken.ThrowIfCancellationRequested();
 547
 3548                Debug.Assert(_disposedCts.Token.IsCancellationRequested);
 3549                throw new IceRpcException(
 3550                    IceRpcError.OperationAborted,
 3551                    "The connection shutdown was aborted because the connection was disposed.");
 552            }
 11553            catch (IceRpcException)
 11554            {
 11555                throw;
 556            }
 0557            catch (Exception exception)
 0558            {
 0559                Debug.Fail($"ShutdownAsync failed with an unexpected exception: {exception}");
 0560                throw;
 561            }
 562
 563            static void EncodeCloseConnectionFrame(IBufferWriter<byte> writer)
 26564            {
 26565                var encoder = new IceEncoder(writer);
 26566                IceDefinitions.CloseConnectionFrame.Encode(ref encoder);
 26567            }
 47568        }
 64569    }
 570
 225571    internal IceProtocolConnection(
 225572        IDuplexConnection duplexConnection,
 225573        TransportConnectionInformation? transportConnectionInformation,
 225574        ConnectionOptions options)
 225575    {
 225576        _twowayDispatchesCts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token);
 577
 578        // With ice, we always listen for incoming frames (responses) so we need a dispatcher for incoming requests even
 579        // if we don't expect any. This dispatcher throws an ice ObjectNotExistException back to the client, which makes
 580        // more sense than throwing an UnknownException.
 225581        _dispatcher = options.Dispatcher ?? NotFoundDispatcher.Instance;
 582
 225583        _maxFrameSize = options.MaxIceFrameSize;
 225584        _transportConnectionInformation = transportConnectionInformation;
 585
 225586        if (options.MaxDispatches > 0)
 225587        {
 225588            _dispatchSemaphore = new SemaphoreSlim(
 225589                initialCount: options.MaxDispatches,
 225590                maxCount: options.MaxDispatches);
 225591        }
 592
 225593        _inactivityTimeout = options.InactivityTimeout;
 594
 595        // The readerScheduler doesn't matter (we don't call pipe.Reader.ReadAsync on the resulting pipe), and the
 596        // writerScheduler doesn't matter (pipe.Writer.FlushAsync never blocks).
 225597        _pipeOptions = new PipeOptions(
 225598            pool: options.Pool,
 225599            minimumSegmentSize: options.MinSegmentSize,
 225600            pauseWriterThreshold: 0,
 225601            useSynchronizationContext: false);
 602
 225603        if (options.IceIdleTimeout != Timeout.InfiniteTimeSpan)
 225604        {
 225605            duplexConnection = new IceDuplexConnectionDecorator(
 225606                duplexConnection,
 225607                readIdleTimeout: options.EnableIceIdleCheck ? options.IceIdleTimeout : Timeout.InfiniteTimeSpan,
 225608                writeIdleTimeout: options.IceIdleTimeout,
 225609                SendHeartbeat);
 225610        }
 611
 225612        _duplexConnection = duplexConnection;
 225613        _duplexConnectionReader = new DuplexConnectionReader(_duplexConnection, options.Pool, options.MinSegmentSize);
 225614        _duplexConnectionWriter =
 225615            new IceDuplexConnectionWriter(_duplexConnection, options.Pool, options.MinSegmentSize);
 616
 225617        _inactivityTimeoutTimer = new Timer(_ =>
 5618        {
 5619            bool requestShutdown = false;
 225620
 225621            lock (_mutex)
 5622            {
 5623                if (_dispatchInvocationCount == 0 && _shutdownTask is null)
 5624                {
 5625                    requestShutdown = true;
 5626                    RefuseNewInvocations(
 5627                        $"The connection was shut down because it was inactive for over {_inactivityTimeout.TotalSeconds
 5628                }
 5629            }
 225630
 5631            if (requestShutdown)
 5632            {
 225633                // TrySetResult must be called outside the mutex lock.
 5634                _shutdownRequestedTcs.TrySetResult();
 5635            }
 230636        });
 637
 638        void SendHeartbeat()
 11639        {
 640            lock (_mutex)
 11641            {
 11642                if (_heartbeatTask.IsCompletedSuccessfully && _heartbeatEnabled)
 11643                {
 11644                    _heartbeatTask = SendValidateConnectionFrameAsync(_disposedCts.Token);
 11645                }
 11646            }
 647
 648            async Task SendValidateConnectionFrameAsync(CancellationToken cancellationToken)
 11649            {
 650                // Make sure we execute the function without holding the connection mutex lock.
 11651                await Task.Yield();
 652
 653                try
 11654                {
 11655                    await SendControlFrameAsync(EncodeValidateConnectionFrame, cancellationToken).ConfigureAwait(false);
 11656                }
 0657                catch (OperationCanceledException)
 0658                {
 659                    // Canceled by DisposeAsync
 0660                    throw;
 661                }
 0662                catch (IceRpcException)
 0663                {
 664                    // Expected, typically the peer aborted the connection.
 0665                    throw;
 666                }
 0667                catch (Exception exception)
 0668                {
 0669                    Debug.Fail($"The heartbeat task completed due to an unhandled exception: {exception}");
 0670                    throw;
 671                }
 672
 673                static void EncodeValidateConnectionFrame(IBufferWriter<byte> writer)
 11674                {
 11675                    var encoder = new IceEncoder(writer);
 11676                    IceDefinitions.ValidateConnectionFrame.Encode(ref encoder);
 11677                }
 11678            }
 11679        }
 225680    }
 681
 682    private static (int RequestId, IceRequestHeader Header, PipeReader? ContextReader, int Consumed) DecodeRequestIdAndH
 683        ReadOnlySequence<byte> buffer)
 1393684    {
 1393685        var decoder = new IceDecoder(buffer);
 686
 1393687        int requestId = decoder.DecodeInt();
 688
 1393689        var requestHeader = new IceRequestHeader(ref decoder);
 1393690        requestHeader.Facet.CheckFacetCount();
 691
 1393692        Pipe? contextPipe = null;
 1393693        long pos = decoder.Consumed;
 1393694        int count = decoder.DecodeSize();
 1393695        if (count > 0)
 7696        {
 28697            for (int i = 0; i < count; ++i)
 7698            {
 7699                decoder.Skip(decoder.DecodeSize()); // Skip the key
 7700                decoder.Skip(decoder.DecodeSize()); // Skip the value
 7701            }
 7702            contextPipe = new Pipe();
 7703            contextPipe.Writer.Write(buffer.Slice(pos, decoder.Consumed - pos));
 7704            contextPipe.Writer.Complete();
 7705        }
 706
 1393707        var encapsulationHeader = new EncapsulationHeader(ref decoder);
 708
 1393709        if (encapsulationHeader.PayloadEncodingMajor != 1 ||
 1393710            encapsulationHeader.PayloadEncodingMinor != 1)
 0711        {
 0712            throw new InvalidDataException(
 0713                $"Unsupported payload encoding '{encapsulationHeader.PayloadEncodingMajor}.{encapsulationHeader.PayloadE
 714        }
 715
 1393716        int payloadSize = encapsulationHeader.EncapsulationSize - 6;
 1393717        if (payloadSize != (buffer.Length - decoder.Consumed))
 0718        {
 0719            throw new InvalidDataException(
 0720                $"Request payload size mismatch: expected {payloadSize} bytes, read {buffer.Length - decoder.Consumed} b
 721        }
 722
 1393723        return (requestId, requestHeader, contextPipe?.Reader, (int)decoder.Consumed);
 1393724    }
 725
 726    private static (StatusCode StatusCode, string? ErrorMessage, SequencePosition Consumed) DecodeResponseHeader(
 727        ReadOnlySequence<byte> buffer,
 728        int requestId)
 369729    {
 369730        var replyStatus = (ReplyStatus)buffer.FirstSpan[0];
 731
 369732        if (replyStatus <= ReplyStatus.UserException)
 333733        {
 734            const int headerSize = 7; // reply status byte + encapsulation header
 735
 736            // read and check encapsulation header (6 bytes long)
 737
 333738            if (buffer.Length < headerSize)
 0739            {
 0740                throw new InvalidDataException($"Received invalid frame header for request with id '{requestId}'.");
 741            }
 742
 333743            EncapsulationHeader encapsulationHeader =
 666744                buffer.Slice(1, 6).DecodeIceBuffer((ref IceDecoder decoder) => new EncapsulationHeader(ref decoder));
 745
 746            // Sanity check
 333747            int payloadSize = encapsulationHeader.EncapsulationSize - 6;
 333748            if (payloadSize != buffer.Length - headerSize)
 0749            {
 0750                throw new InvalidDataException(
 0751                    $"Response payload size/frame size mismatch: payload size is {payloadSize} bytes but frame has {buff
 752            }
 753
 333754            SequencePosition consumed = buffer.GetPosition(headerSize);
 755
 333756            return replyStatus == ReplyStatus.Ok ? (StatusCode.Ok, null, consumed) :
 333757                // Set the error message to the empty string, because null is not allowed for status code > Ok.
 333758                (StatusCode.ApplicationError, "", consumed);
 759        }
 760        else
 36761        {
 762            // An ice system exception.
 763
 36764            StatusCode statusCode = replyStatus switch
 36765            {
 14766                ReplyStatus.ObjectNotExist => StatusCode.NotFound,
 0767                ReplyStatus.FacetNotExist => StatusCode.NotFound,
 2768                ReplyStatus.OperationNotExist => StatusCode.NotImplemented,
 4769                ReplyStatus.InvalidData => StatusCode.InvalidData,
 1770                ReplyStatus.Unauthorized => StatusCode.Unauthorized,
 1771                ReplyStatus.NotSupported => StatusCode.NotSupported,
 14772                _ => StatusCode.InternalError
 36773            };
 774
 36775            var decoder = new IceDecoder(buffer.Slice(1));
 776
 777            string message;
 36778            switch (replyStatus)
 779            {
 780                case ReplyStatus.FacetNotExist:
 781                case ReplyStatus.ObjectNotExist:
 782                case ReplyStatus.OperationNotExist:
 783
 16784                    var requestFailed = new RequestFailedExceptionData(ref decoder);
 785
 16786                    string target = requestFailed.Facet.Count > 0 ?
 16787                        $"{requestFailed.Identity.ToPath()}#{requestFailed.Facet.ToFragment()}" : requestFailed.Identity
 788
 16789                    message =
 16790                        $"The dispatch failed with status code {statusCode} while dispatching '{requestFailed.Operation}
 16791                    break;
 792                default:
 20793                    message = decoder.DecodeString();
 20794                    break;
 795            }
 36796            decoder.CheckEndOfBuffer();
 36797            return (statusCode, message, buffer.End);
 798        }
 369799    }
 800
 801    private static void EncodeRequestHeader(
 802        IceDuplexConnectionWriter output,
 803        OutgoingRequest request,
 804        int requestId,
 805        int payloadSize)
 1395806    {
 1395807        var encoder = new IceEncoder(output);
 808
 809        // Write the request header.
 1395810        encoder.WriteByteSpan(IceDefinitions.FramePrologue);
 1395811        encoder.EncodeIceFrameType(IceFrameType.Request);
 1395812        encoder.EncodeByte(0); // compression status
 813
 1395814        Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(4);
 815
 1395816        encoder.EncodeInt(requestId);
 817
 1395818        byte encodingMajor = 1;
 1395819        byte encodingMinor = 1;
 820
 821        // Request header.
 1395822        var requestHeader = new IceRequestHeader(
 1395823            IceIdentity.Parse(request.ServiceAddress.Path),
 1395824            request.ServiceAddress.Fragment.ToFacet(),
 1395825            request.Operation,
 1395826            request.Fields.ContainsKey(RequestFieldKey.Idempotent) ? OperationMode.Idempotent : OperationMode.Normal);
 1395827        requestHeader.Encode(ref encoder);
 1395828        int directWriteSize = 0;
 1395829        if (request.Fields.TryGetValue(RequestFieldKey.Context, out OutgoingFieldValue requestField))
 7830        {
 7831            if (requestField.WriteAction is Action<IBufferWriter<byte>> writeAction)
 7832            {
 833                // This writes directly to the underlying output; we measure how many bytes are written.
 7834                long start = output.UnflushedBytes;
 7835                writeAction(output);
 7836                directWriteSize = (int)(output.UnflushedBytes - start);
 7837            }
 838            else
 0839            {
 0840                encoder.WriteByteSequence(requestField.ByteSequence);
 0841            }
 7842        }
 843        else
 1388844        {
 1388845            encoder.EncodeSize(0);
 1388846        }
 847
 848        // We ignore all other fields. They can't be sent over ice.
 849
 1395850        new EncapsulationHeader(
 1395851            encapsulationSize: payloadSize + 6,
 1395852            encodingMajor,
 1395853            encodingMinor).Encode(ref encoder);
 854
 1395855        int frameSize = checked(encoder.EncodedByteCount + directWriteSize + payloadSize);
 1395856        IceEncoder.EncodeInt(frameSize, sizePlaceholder);
 1395857    }
 858
 859    private static void EncodeResponseHeader(
 860        IBufferWriter<byte> writer,
 861        OutgoingResponse response,
 862        IncomingRequest request,
 863        int requestId,
 864        int payloadSize)
 1380865    {
 1380866        var encoder = new IceEncoder(writer);
 867
 868        // Write the response header.
 869
 1380870        encoder.WriteByteSpan(IceDefinitions.FramePrologue);
 1380871        encoder.EncodeIceFrameType(IceFrameType.Reply);
 1380872        encoder.EncodeByte(0); // compression status
 1380873        Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(4);
 874
 1380875        encoder.EncodeInt(requestId);
 876
 1380877        if (response.StatusCode > StatusCode.ApplicationError ||
 1380878            (response.StatusCode == StatusCode.ApplicationError && payloadSize == 0))
 38879        {
 880            // system exception
 38881            switch (response.StatusCode)
 882            {
 883                case StatusCode.NotFound:
 884                case StatusCode.NotImplemented:
 18885                    encoder.EncodeReplyStatus(response.StatusCode == StatusCode.NotFound ?
 18886                        ReplyStatus.ObjectNotExist : ReplyStatus.OperationNotExist);
 887
 18888                    new RequestFailedExceptionData(
 18889                        IceIdentity.Parse(request.Path),
 18890                        request.Fragment.ToFacet(),
 18891                        request.Operation).Encode(ref encoder);
 18892                    break;
 893                case StatusCode.InternalError:
 8894                    encoder.EncodeReplyStatus(ReplyStatus.UnknownException);
 8895                    encoder.EncodeString(response.ErrorMessage!);
 8896                    break;
 897                case StatusCode.InvalidData:
 4898                    encoder.EncodeReplyStatus(ReplyStatus.InvalidData);
 4899                    encoder.EncodeString(response.ErrorMessage!);
 4900                    break;
 901                case StatusCode.Unauthorized:
 1902                    encoder.EncodeReplyStatus(ReplyStatus.Unauthorized);
 1903                    encoder.EncodeString(response.ErrorMessage!);
 1904                    break;
 905                case StatusCode.NotSupported:
 1906                    encoder.EncodeReplyStatus(ReplyStatus.NotSupported);
 1907                    encoder.EncodeString(response.ErrorMessage!);
 1908                    break;
 909                default:
 6910                    encoder.EncodeReplyStatus(ReplyStatus.UnknownException);
 6911                    encoder.EncodeString(
 6912                        $"{response.ErrorMessage} {{ Original StatusCode = {response.StatusCode} }}");
 6913                    break;
 914            }
 38915        }
 916        else
 1342917        {
 1342918            encoder.EncodeReplyStatus((ReplyStatus)response.StatusCode);
 919
 920            // When IceRPC receives a response, it ignores the response encoding. So this "1.1" is only relevant to
 921            // a ZeroC Ice client that decodes the response. The only Slice encoding such a client can possibly use
 922            // to decode the response payload is 1.1 or 1.0, and we don't care about interop with 1.0.
 1342923            var encapsulationHeader = new EncapsulationHeader(
 1342924                encapsulationSize: payloadSize + 6,
 1342925                payloadEncodingMajor: 1,
 1342926                payloadEncodingMinor: 1);
 1342927            encapsulationHeader.Encode(ref encoder);
 1342928        }
 929
 1380930        int frameSize = encoder.EncodedByteCount + payloadSize;
 1380931        IceEncoder.EncodeInt(frameSize, sizePlaceholder);
 1380932    }
 933
 934    /// <summary>Reads the full Ice payload from the given pipe reader.</summary>
 935    private static async ValueTask<ReadOnlySequence<byte>> ReadFullPayloadAsync(
 936        PipeReader payload,
 937        CancellationToken cancellationToken)
 2742938    {
 939        // We use ReadAtLeastAsync instead of ReadAsync to bypass the PauseWriterThreshold when the payload is
 940        // backed by a Pipe.
 2742941        ReadResult readResult = await payload.ReadAtLeastAsync(int.MaxValue, cancellationToken).ConfigureAwait(false);
 942
 2739943        if (readResult.IsCanceled)
 0944        {
 0945            throw new InvalidOperationException("Unexpected call to CancelPendingRead on ice payload.");
 946        }
 947
 2739948        return readResult.IsCompleted ? readResult.Buffer :
 2739949            throw new ArgumentException("The payload size is greater than int.MaxValue.", nameof(payload));
 2739950    }
 951
 952    /// <summary>Acquires exclusive access to _duplexConnectionWriter.</summary>
 953    /// <returns>A <see cref="SemaphoreLock" /> that releases the acquired semaphore in its Dispose method.</returns>
 954    private async ValueTask<SemaphoreLock> AcquireWriteLockAsync(CancellationToken cancellationToken)
 2917955    {
 2917956        SemaphoreLock semaphoreLock = await _writeSemaphore.AcquireAsync(cancellationToken).ConfigureAwait(false);
 957
 958        // _writeException is protected by _writeSemaphore
 2917959        if (_writeException is not null)
 1960        {
 1961            semaphoreLock.Dispose();
 962
 1963            throw new IceRpcException(
 1964                IceRpcError.ConnectionAborted,
 1965                "The connection was aborted because a previous write operation failed.",
 1966                _writeException);
 967        }
 968
 2916969        return semaphoreLock;
 2916970    }
 971
 972    /// <summary>Creates a pipe reader to simplify the reading of a request or response frame. The frame is read fully
 973    /// and buffered into an internal pipe.</summary>
 974    private async ValueTask<PipeReader> CreateFrameReaderAsync(int size, CancellationToken cancellationToken)
 2766975    {
 2766976        var pipe = new Pipe(_pipeOptions);
 977
 978        try
 2766979        {
 2766980            await _duplexConnectionReader.FillBufferWriterAsync(pipe.Writer, size, cancellationToken)
 2766981                .ConfigureAwait(false);
 2766982        }
 0983        catch
 0984        {
 0985            pipe.Reader.Complete();
 0986            throw;
 987        }
 988        finally
 2766989        {
 2766990            pipe.Writer.Complete();
 2766991        }
 992
 2766993        return pipe.Reader;
 2766994    }
 995
 996    private void DecrementDispatchInvocationCount()
 2786997    {
 998        lock (_mutex)
 2786999        {
 27861000            if (--_dispatchInvocationCount == 0)
 12411001            {
 12411002                if (_shutdownTask is not null)
 201003                {
 201004                    _dispatchesAndInvocationsCompleted.TrySetResult();
 201005                }
 1006                // We enable the inactivity check in order to complete ShutdownRequested when inactive for too long.
 1007                // _refuseInvocations is true when the connection is either about to be "shutdown requested", or shut
 1008                // down / disposed. We don't need to complete ShutdownRequested in any of these situations.
 12211009                else if (!_refuseInvocations)
 12071010                {
 12071011                    ScheduleInactivityCheck();
 12071012                }
 12411013            }
 27861014        }
 27861015    }
 1016
 1017    /// <summary>Dispatches an incoming request. This method executes in a task spawn from the read frames loop.
 1018    /// </summary>
 1019    private async Task DispatchRequestAsync(IncomingRequest request, int requestId, PipeReader? contextReader)
 13911020    {
 13911021        CancellationToken cancellationToken = request.IsOneway ? _disposedCts.Token : _twowayDispatchesCts.Token;
 1022
 1023        OutgoingResponse? response;
 1024        try
 13911025        {
 1026            // The dispatcher can complete the incoming request payload to release its memory as soon as possible.
 1027            try
 13911028            {
 1029                // _dispatcher.DispatchAsync may very well ignore the cancellation token and we don't want to keep
 1030                // dispatching when the cancellation token is canceled.
 13911031                cancellationToken.ThrowIfCancellationRequested();
 1032
 13911033                response = await _dispatcher.DispatchAsync(request, cancellationToken).ConfigureAwait(false);
 13721034            }
 1035            finally
 13911036            {
 13911037                _dispatchSemaphore?.Release();
 13911038            }
 1039
 13721040            if (response != request.Response)
 11041            {
 11042                throw new InvalidOperationException(
 11043                    "The dispatcher did not return the last response created for this request.");
 1044            }
 13711045        }
 201046        catch when (request.IsOneway)
 01047        {
 1048            // ignored since we're not returning anything
 01049            response = null;
 01050        }
 101051        catch (OperationCanceledException exception) when (exception.CancellationToken == cancellationToken)
 91052        {
 1053            // expected when the connection is disposed or the request is canceled by the peer's shutdown
 91054            response = null;
 91055        }
 111056        catch (Exception exception)
 111057        {
 111058            if (exception is not DispatchException dispatchException)
 71059            {
 71060                StatusCode statusCode = exception is InvalidDataException ?
 71061                    StatusCode.InvalidData : StatusCode.InternalError;
 71062                dispatchException = new DispatchException(statusCode, innerException: exception);
 71063            }
 111064            response = dispatchException.ToOutgoingResponse(request);
 111065        }
 1066        finally
 13911067        {
 13911068            request.Payload.Complete();
 13911069            contextReader?.Complete();
 1070
 1071            // The field values are now invalid - they point to potentially recycled and reused memory. We
 1072            // replace Fields by an empty dictionary to prevent accidental access to this reused memory.
 13911073            request.Fields = ImmutableDictionary<RequestFieldKey, ReadOnlySequence<byte>>.Empty;
 13911074        }
 1075
 1076        try
 13911077        {
 13911078            if (response is not null)
 13821079            {
 1080                // Read the full response payload. This can take some time so this needs to be done before acquiring
 1081                // the write semaphore.
 13821082                ReadOnlySequence<byte> payload = ReadOnlySequence<byte>.Empty;
 1083
 13821084                if (response.StatusCode <= StatusCode.ApplicationError)
 13471085                {
 1086                    try
 13471087                    {
 13471088                        payload = await ReadFullPayloadAsync(response.Payload, cancellationToken)
 13471089                            .ConfigureAwait(false);
 13441090                    }
 21091                    catch (OperationCanceledException exception) when (exception.CancellationToken == cancellationToken)
 21092                    {
 21093                        throw;
 1094                    }
 11095                    catch (Exception exception)
 11096                    {
 1097                        // We "encode" the exception in the error message.
 1098
 11099                        response = new OutgoingResponse(
 11100                            request,
 11101                            StatusCode.InternalError,
 11102                            "The dispatch failed to read the response payload.",
 11103                            exception);
 11104                    }
 13451105                }
 1106                // else payload remains empty because the payload of a dispatch exception (if any) cannot be sent
 1107                // over ice.
 1108
 13801109                int payloadSize = checked((int)payload.Length);
 1110
 1111                // Wait for writing of other frames to complete.
 13801112                using SemaphoreLock _ = await AcquireWriteLockAsync(cancellationToken).ConfigureAwait(false);
 1113                try
 13801114                {
 13801115                    EncodeResponseHeader(_duplexConnectionWriter, response, request, requestId, payloadSize);
 1116
 1117                    // We write to the duplex connection with _disposedCts.Token instead of cancellationToken.
 1118                    // Canceling this write operation is fatal to the connection.
 13801119                    await _duplexConnectionWriter.WriteAsync(payload, _disposedCts.Token).ConfigureAwait(false);
 13791120                }
 11121                catch (Exception exception)
 11122                {
 11123                    WriteFailed(exception);
 11124                    throw;
 1125                }
 13791126            }
 13881127        }
 31128        catch (OperationCanceledException exception) when (
 31129            exception.CancellationToken == _disposedCts.Token ||
 31130            exception.CancellationToken == cancellationToken)
 31131        {
 1132            // expected when the connection is disposed or the request is canceled by the peer's shutdown
 31133        }
 1134        finally
 13911135        {
 13911136            DecrementDispatchInvocationCount();
 13911137        }
 13911138    }
 1139
 1140    /// <summary>Increments the dispatch-invocation count.</summary>
 1141    /// <remarks>This method must be called with _mutex locked.</remarks>
 1142    private void IncrementDispatchInvocationCount()
 27861143    {
 27861144        if (_dispatchInvocationCount++ == 0)
 12411145        {
 1146            // Cancel inactivity check.
 12411147            _inactivityTimeoutTimer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
 12411148        }
 27861149    }
 1150
 1151    private void ScheduleInactivityCheck() =>
 14081152        _inactivityTimeoutTimer.Change(_inactivityTimeout, Timeout.InfiniteTimeSpan);
 1153
 1154    /// <summary>Reads incoming frames and returns successfully when a CloseConnection frame is received or when the
 1155    /// connection is aborted during ShutdownAsync or canceled by DisposeAsync.</summary>
 1156    private async Task ReadFramesAsync(CancellationToken cancellationToken)
 2011157    {
 2011158        await Task.Yield(); // exit mutex lock
 1159
 1160        // Wait for _connectTask (which spawned the task running this method) to complete. This way, we won't dispatch
 1161        // any request until _connectTask has completed successfully, and indirectly we won't make any invocation until
 1162        // _connectTask has completed successfully. The creation of the _readFramesTask is the last action taken by
 1163        // _connectTask and as a result this await can't fail.
 2011164        await _connectTask!.ConfigureAwait(false);
 1165
 1166        try
 2011167        {
 29781168            while (!cancellationToken.IsCancellationRequested)
 29761169            {
 29761170                ReadOnlySequence<byte> buffer = await _duplexConnectionReader.ReadAtLeastAsync(
 29761171                    IceDefinitions.PrologueSize,
 29761172                    cancellationToken).ConfigureAwait(false);
 1173
 1174                // First decode and check the prologue.
 1175
 28041176                ReadOnlySequence<byte> prologueBuffer = buffer.Slice(0, IceDefinitions.PrologueSize);
 1177
 28041178                IcePrologue prologue =
 56081179                    prologueBuffer.DecodeIceBuffer((ref IceDecoder decoder) => new IcePrologue(ref decoder));
 1180
 28041181                _duplexConnectionReader.AdvanceTo(prologueBuffer.End);
 1182
 28041183                IceDefinitions.CheckPrologue(prologue);
 28031184                if (prologue.FrameSize > _maxFrameSize)
 11185                {
 11186                    throw new InvalidDataException(
 11187                        $"Received frame with size ({prologue.FrameSize}) greater than max frame size.");
 1188                }
 1189
 28021190                if (prologue.CompressionStatus == 2)
 01191                {
 1192                    // The exception handler calls ReadFailed.
 01193                    throw new IceRpcException(
 01194                        IceRpcError.ConnectionAborted,
 01195                        "The connection was aborted because it received a compressed ice frame, and IceRPC does not supp
 1196                }
 1197
 1198                // Then process the frame based on its type.
 28021199                switch (prologue.FrameType)
 1200                {
 1201                    case IceFrameType.CloseConnection:
 251202                    {
 251203                        if (prologue.FrameSize != IceDefinitions.PrologueSize)
 01204                        {
 01205                            throw new InvalidDataException(
 01206                                $"Received {nameof(IceFrameType.CloseConnection)} frame with unexpected data.");
 1207                        }
 1208
 1209                        lock (_mutex)
 251210                        {
 251211                            RefuseNewInvocations(
 251212                                "The connection was shut down because it received a CloseConnection frame from the peer.
 1213
 1214                            // By exiting the "read frames loop" below, we are refusing new dispatches as well.
 1215
 1216                            // Only one side sends the CloseConnection frame.
 251217                            _sendCloseConnectionFrame = false;
 251218                        }
 1219
 1220                        // Even though we're in the "read frames loop", it's ok to cancel CTS and a "synchronous" TCS
 1221                        // below. We won't be reading anything else so it's ok to run continuations synchronously.
 1222
 1223                        // Abort two-way invocations that are waiting for a response (it will never come).
 1224                        // We use InvocationCanceled (not ConnectionAborted) because the ice protocol guarantees the
 1225                        // peer has sent responses for all two-way requests it accepted before sending CloseConnection.
 1226                        // These pending two-way requests were never processed by the peer, so it's safe for a retry
 1227                        // interceptor to retry them unconditionally.
 251228                        AbortTwowayInvocations(
 251229                            IceRpcError.InvocationCanceled,
 251230                            "The invocation was canceled by the shutdown of the peer.");
 1231
 1232                        // Cancel two-way dispatches since the peer is not interested in the responses. This does not
 1233                        // cancel ongoing writes to _duplexConnection: we don't send incomplete/invalid data.
 251234                        _twowayDispatchesCts.Cancel();
 1235
 1236                        // We keep sending heartbeats. If the shutdown request / shutdown is not fulfilled quickly, they
 1237                        // tell the peer we're still alive and maybe stuck waiting for invocations and dispatches to
 1238                        // complete.
 1239
 1240                        // We request a shutdown that will dispose _duplexConnection once all invocations and dispatches
 1241                        // have completed.
 251242                        _shutdownRequestedTcs.TrySetResult();
 251243                        return;
 1244                    }
 1245
 1246                    case IceFrameType.Request:
 13931247                        await ReadRequestAsync(prologue.FrameSize, cancellationToken).ConfigureAwait(false);
 13931248                        break;
 1249
 1250                    case IceFrameType.RequestBatch:
 1251                        // The exception handler calls ReadFailed.
 01252                        throw new IceRpcException(
 01253                            IceRpcError.ConnectionAborted,
 01254                            "The connection was aborted because it received a batch request, and IceRPC does not support
 1255
 1256                    case IceFrameType.Reply:
 13731257                        await ReadReplyAsync(prologue.FrameSize, cancellationToken).ConfigureAwait(false);
 13731258                        break;
 1259
 1260                    case IceFrameType.ValidateConnection:
 111261                    {
 111262                        if (prologue.FrameSize != IceDefinitions.PrologueSize)
 01263                        {
 01264                            throw new InvalidDataException(
 01265                                $"Received {nameof(IceFrameType.ValidateConnection)} frame with unexpected data.");
 1266                        }
 111267                        break;
 1268                    }
 1269
 1270                    default:
 01271                    {
 01272                        throw new InvalidDataException(
 01273                            $"Received Ice frame with unknown frame type '{prologue.FrameType}'.");
 1274                    }
 1275                }
 27771276            } // while
 21277        }
 701278        catch (OperationCanceledException)
 701279        {
 1280            // canceled by DisposeAsync, no need to throw anything
 701281        }
 1021282        catch (IceRpcException exception) when (
 1021283            exception.IceRpcError == IceRpcError.ConnectionAborted &&
 1021284            _dispatchesAndInvocationsCompleted.Task.IsCompleted)
 481285        {
 1286            // The peer acknowledged receipt of the CloseConnection frame by aborting the duplex connection. Return.
 1287            // See ShutdownAsync.
 481288        }
 541289        catch (IceRpcException exception)
 541290        {
 541291            ReadFailed(exception);
 541292            throw;
 1293        }
 21294        catch (InvalidDataException exception)
 21295        {
 21296            ReadFailed(exception);
 21297            throw new IceRpcException(
 21298                IceRpcError.ConnectionAborted,
 21299                "The connection was aborted by an ice protocol error.",
 21300                exception);
 1301        }
 01302        catch (Exception exception)
 01303        {
 01304            Debug.Fail($"The read frames task completed due to an unhandled exception: {exception}");
 01305            ReadFailed(exception);
 01306            throw;
 1307        }
 1308
 1309        // Aborts all pending two-way invocations. Must be called outside the mutex lock after setting
 1310        // _refuseInvocations to true.
 1311        void AbortTwowayInvocations(IceRpcError error, string message, Exception? exception = null)
 811312        {
 811313            Debug.Assert(_refuseInvocations);
 1314
 1315            // _twowayInvocations is immutable once _refuseInvocations is true.
 2671316            foreach (TaskCompletionSource<PipeReader> responseCompletionSource in _twowayInvocations.Values)
 121317            {
 1318                // _twowayInvocations can hold completed completion sources.
 121319                _ = responseCompletionSource.TrySetException(new IceRpcException(error, message, exception));
 121320            }
 811321        }
 1322
 1323        // Takes appropriate action after a read failure.
 1324        void ReadFailed(Exception exception)
 561325        {
 1326            // We also prevent new one-way invocations even though they don't need to read the connection.
 561327            RefuseNewInvocations("The connection was lost because a read operation failed.");
 1328
 1329            // It's ok to cancel CTS and a "synchronous" TCS below. We won't be reading anything else so it's ok to run
 1330            // continuations synchronously.
 1331
 561332            AbortTwowayInvocations(
 561333                IceRpcError.ConnectionAborted,
 561334                "The invocation was aborted because the connection was lost.",
 561335                exception);
 1336
 1337            // ReadFailed is called when the connection is dead or the peer sent us a non-supported frame (e.g. a
 1338            // batch request). We don't need to allow outstanding two-way dispatches to complete in these situations, so
 1339            // we cancel them to speed-up the shutdown.
 561340            _twowayDispatchesCts.Cancel();
 1341
 1342            lock (_mutex)
 561343            {
 1344                // Don't send a close connection frame since we can't wait for the peer's acknowledgment.
 561345                _sendCloseConnectionFrame = false;
 561346            }
 1347
 561348            _ = _shutdownRequestedTcs.TrySetResult();
 561349        }
 1451350    }
 1351
 1352    /// <summary>Reads a reply (incoming response) and completes the invocation response completion source with this
 1353    /// response. This method executes "synchronously" in the read frames loop.</summary>
 1354    private async Task ReadReplyAsync(int replyFrameSize, CancellationToken cancellationToken)
 13731355    {
 1356        // Read the remainder of the frame immediately into frameReader.
 13731357        PipeReader replyFrameReader = await CreateFrameReaderAsync(
 13731358            replyFrameSize - IceDefinitions.PrologueSize,
 13731359            cancellationToken).ConfigureAwait(false);
 1360
 13731361        bool completeFrameReader = true;
 1362
 1363        try
 13731364        {
 1365            // Read and decode request ID
 13731366            if (!replyFrameReader.TryRead(out ReadResult readResult) || readResult.Buffer.Length < 4)
 01367            {
 01368                throw new InvalidDataException("Received a response with an invalid request ID.");
 1369            }
 1370
 13731371            ReadOnlySequence<byte> requestIdBuffer = readResult.Buffer.Slice(0, 4);
 27461372            int requestId = requestIdBuffer.DecodeIceBuffer((ref IceDecoder decoder) => decoder.DecodeInt());
 13731373            replyFrameReader.AdvanceTo(requestIdBuffer.End);
 1374
 1375            lock (_mutex)
 13731376            {
 13731377                if (_twowayInvocations.TryGetValue(
 13731378                    requestId,
 13731379                    out TaskCompletionSource<PipeReader>? responseCompletionSource))
 3691380                {
 1381                    // continuation runs asynchronously
 3691382                    if (responseCompletionSource.TrySetResult(replyFrameReader))
 3691383                    {
 3691384                        completeFrameReader = false;
 3691385                    }
 1386                    // else this invocation just completed and is about to remove itself from _twowayInvocations,
 1387                    // or _twowayInvocations is immutable and contains entries for completed invocations.
 3691388                }
 1389                // else the request ID carried by the response is bogus or corresponds to a request that was previously
 1390                // discarded (for example, because its deadline expired).
 13731391            }
 13731392        }
 1393        finally
 13731394        {
 13731395            if (completeFrameReader)
 10041396            {
 10041397                replyFrameReader.Complete();
 10041398            }
 13731399        }
 13731400    }
 1401
 1402    /// <summary>Reads and then dispatches an incoming request in a separate dispatch task. This method executes
 1403    /// "synchronously" in the read frames loop.</summary>
 1404    private async Task ReadRequestAsync(int requestFrameSize, CancellationToken cancellationToken)
 13931405    {
 1406        // Read the request frame.
 13931407        PipeReader requestFrameReader = await CreateFrameReaderAsync(
 13931408            requestFrameSize - IceDefinitions.PrologueSize,
 13931409            cancellationToken).ConfigureAwait(false);
 1410
 1411        // Decode its header.
 1412        int requestId;
 1413        IceRequestHeader requestHeader;
 13931414        PipeReader? contextReader = null;
 1415        IDictionary<RequestFieldKey, ReadOnlySequence<byte>>? fields;
 13931416        Task? dispatchTask = null;
 1417
 1418        try
 13931419        {
 13931420            if (!requestFrameReader.TryRead(out ReadResult readResult))
 01421            {
 01422                throw new InvalidDataException("Received an invalid request frame.");
 1423            }
 1424
 13931425            Debug.Assert(readResult.IsCompleted);
 1426
 13931427            (requestId, requestHeader, contextReader, int consumed) = DecodeRequestIdAndHeader(readResult.Buffer);
 13931428            requestFrameReader.AdvanceTo(readResult.Buffer.GetPosition(consumed));
 1429
 13931430            if (contextReader is null)
 13861431            {
 13861432                fields = requestHeader.OperationMode == OperationMode.Normal ?
 13861433                    ImmutableDictionary<RequestFieldKey, ReadOnlySequence<byte>>.Empty : _idempotentFields;
 13861434            }
 1435            else
 71436            {
 71437                contextReader.TryRead(out ReadResult result);
 71438                Debug.Assert(result.Buffer.Length > 0 && result.IsCompleted);
 71439                fields = new Dictionary<RequestFieldKey, ReadOnlySequence<byte>>()
 71440                {
 71441                    [RequestFieldKey.Context] = result.Buffer
 71442                };
 1443
 71444                if (requestHeader.OperationMode != OperationMode.Normal)
 01445                {
 1446                    // OperationMode can be Idempotent or Nonmutating.
 01447                    fields[RequestFieldKey.Idempotent] = default;
 01448                }
 71449            }
 1450
 13931451            bool releaseDispatchSemaphore = false;
 13931452            if (_dispatchSemaphore is SemaphoreSlim dispatchSemaphore)
 13931453            {
 1454                // This prevents us from receiving any new frames if we're already dispatching the maximum number
 1455                // of requests. We need to do this in the "accept from network loop" to apply back pressure to the
 1456                // caller.
 1457                try
 13931458                {
 13931459                    await dispatchSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
 13921460                    releaseDispatchSemaphore = true;
 13921461                }
 11462                catch (OperationCanceledException)
 11463                {
 1464                    // and return below
 11465                }
 13931466            }
 1467
 1468            lock (_mutex)
 13931469            {
 13931470                if (_shutdownTask is not null)
 21471                {
 1472                    // The connection is (being) disposed or the connection is shutting down and received a request.
 1473                    // We simply discard it. For a graceful shutdown, the two-way invocation in the peer will throw
 1474                    // IceRpcException(InvocationCanceled). We also discard one-way requests: if we accepted them, they
 1475                    // could delay our shutdown and make it time out.
 21476                    if (releaseDispatchSemaphore)
 11477                    {
 11478                        _dispatchSemaphore!.Release();
 11479                    }
 21480                    return;
 1481                }
 1482
 13911483                IncrementDispatchInvocationCount();
 13911484            }
 1485
 1486            // The scheduling of the task can't be canceled since we want to make sure DispatchRequestAsync will
 1487            // cleanup (decrement _dispatchCount etc.) if DisposeAsync is called. dispatchTask takes ownership of the
 1488            // requestFrameReader and contextReader.
 13911489            dispatchTask = Task.Run(
 13911490                async () =>
 13911491                {
 13911492                    using var request = new IncomingRequest(Protocol.Ice, _connectionContext!)
 13911493                    {
 13911494                        Fields = fields,
 13911495                        Fragment = requestHeader.Facet.ToFragment(),
 13911496                        IsOneway = requestId == 0,
 13911497                        Operation = requestHeader.Operation,
 13911498                        Path = requestHeader.Identity.ToPath(),
 13911499                        Payload = requestFrameReader,
 13911500                    };
 13911501
 13911502                    try
 13911503                    {
 13911504                        await DispatchRequestAsync(
 13911505                            request,
 13911506                            requestId,
 13911507                            contextReader).ConfigureAwait(false);
 13911508                    }
 01509                    catch (IceRpcException)
 01510                    {
 13911511                        // expected when the peer aborts the connection.
 01512                    }
 01513                    catch (Exception exception)
 01514                    {
 13911515                        // With ice, a dispatch cannot throw an exception that comes from the application code:
 13911516                        // any exception thrown when reading the response payload is converted into a DispatchException
 13911517                        // response, and the response header has no fields to encode.
 01518                        Debug.Fail($"ice dispatch {request} failed with an unexpected exception: {exception}");
 01519                        throw;
 13911520                    }
 13911521                },
 13911522                CancellationToken.None);
 13911523        }
 1524        finally
 13931525        {
 13931526            if (dispatchTask is null)
 21527            {
 21528                requestFrameReader.Complete();
 21529                contextReader?.Complete();
 21530            }
 13931531        }
 13931532    }
 1533
 1534    private void RefuseNewInvocations(string message)
 3801535    {
 1536        lock (_mutex)
 3801537        {
 3801538            _refuseInvocations = true;
 3801539            _invocationRefusedMessage ??= message;
 3801540        }
 3801541    }
 1542
 1543    /// <summary>Sends a control frame. It takes care of acquiring and releasing the write lock and calls
 1544    /// <see cref="WriteFailed" /> if a failure occurs while writing to _duplexConnectionWriter.</summary>
 1545    /// <param name="encode">Encodes the control frame.</param>
 1546    /// <param name="cancellationToken">The cancellation token.</param>
 1547    /// <remarks>If the cancellation token is canceled while writing to the duplex connection, the connection is
 1548    /// aborted.</remarks>
 1549    private async ValueTask SendControlFrameAsync(
 1550        Action<IBufferWriter<byte>> encode,
 1551        CancellationToken cancellationToken)
 1421552    {
 1421553        using SemaphoreLock _ = await AcquireWriteLockAsync(cancellationToken).ConfigureAwait(false);
 1554
 1555        try
 1411556        {
 1411557            encode(_duplexConnectionWriter);
 1411558            await _duplexConnectionWriter.FlushAsync(cancellationToken).ConfigureAwait(false);
 1381559        }
 31560        catch (Exception exception)
 31561        {
 31562            WriteFailed(exception);
 31563            throw;
 1564        }
 1381565    }
 1566
 1567    /// <summary>Takes appropriate action after a write failure.</summary>
 1568    /// <remarks>Must be called outside the mutex lock but after acquiring _writeSemaphore.</remarks>
 1569    private void WriteFailed(Exception exception)
 51570    {
 51571        Debug.Assert(_writeException is null);
 51572        _writeException = exception; // protected by _writeSemaphore
 1573
 1574        // We can't send new invocations without writing to the connection.
 51575        RefuseNewInvocations("The connection was lost because a write operation failed.");
 1576
 1577        // We can't send responses so these dispatches can be canceled.
 51578        _twowayDispatchesCts.Cancel();
 1579
 1580        // We don't change _sendClosedConnectionFrame. If the _readFrameTask is still running, we want ShutdownAsync
 1581        // to send CloseConnection - and fail.
 1582
 51583        _ = _shutdownRequestedTcs.TrySetResult();
 51584    }
 1585}