< Summary

Information
Class: IceRpc.Transports.Tcp.Internal.TcpConnection
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Tcp/Internal/TcpConnection.cs
Tag: 592_20856082467
Line coverage
75%
Covered lines: 116
Uncovered lines: 37
Coverable lines: 153
Total lines: 411
Line coverage: 75.8%
Branch coverage
77%
Covered branches: 28
Total branches: 36
Branch coverage: 77.7%
Method coverage
90%
Covered methods: 9
Total methods: 10
Method coverage: 90%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%22100%
ConnectAsync(...)100%11100%
Dispose()100%66100%
ReadAsync(...)50%22100%
PerformReadAsync()100%22100%
ShutdownWriteAsync(...)100%11100%
PerformShutdownAsync()100%2.35255.55%
WriteAsync(...)100%11100%
PerformWriteAsync()68.18%30.822273.68%
AbortAndObserveAsync()100%210%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Tcp/Internal/TcpConnection.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using System.Buffers;
 4using System.Diagnostics;
 5using System.Net;
 6using System.Net.Security;
 7using System.Net.Sockets;
 8using System.Runtime.InteropServices;
 9
 10namespace IceRpc.Transports.Tcp.Internal;
 11
 12/// <summary>Implements <see cref="IDuplexConnection" /> for tcp with or without TLS.</summary>
 13/// <remarks>Unlike Coloc, the Tcp transport is not a "checked" transport, which means it does not need to detect
 14/// violations of the duplex transport contract or report such violations. It assumes its clients are sufficiently well
 15/// tested to never violate this contract. As a result, this implementation does not throw
 16/// <see cref="InvalidOperationException" />.</remarks>
 17internal abstract class TcpConnection : IDuplexConnection
 18{
 19    internal abstract Socket Socket { get; }
 20
 21    internal abstract SslStream? SslStream { get; }
 22
 23    private protected volatile bool _isDisposed;
 24
 25    // The MaxDataSize of the SSL implementation.
 26    private const int MaxSslDataSize = 16 * 1024;
 27
 28    private bool _isShutdown;
 29    private readonly int _maxSslBufferSize;
 21830    private readonly List<ArraySegment<byte>> _segments = new();
 31    private readonly IMemoryOwner<byte>? _writeBufferOwner;
 32
 33    public Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken)
 20034    {
 20035        ObjectDisposedException.ThrowIf(_isDisposed, this);
 20036        return ConnectAsyncCore(cancellationToken);
 20037    }
 38
 39    public void Dispose()
 23540    {
 23541        _isDisposed = true;
 42
 23543        if (SslStream is SslStream sslStream)
 6244        {
 6245            sslStream.Dispose();
 6246        }
 47
 48        // If shutdown was called, we can just dispose the connection to complete the graceful TCP closure. Otherwise,
 49        // we abort the TCP connection to ensure the connection doesn't end up in the TIME_WAIT state.
 23550        if (_isShutdown)
 2551        {
 2552            Socket.Dispose();
 2553        }
 54        else
 21055        {
 21056            Socket.Close(0);
 21057        }
 23558        _writeBufferOwner?.Dispose();
 23559    }
 60
 61    public ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken)
 933262    {
 933263        ObjectDisposedException.ThrowIf(_isDisposed, this);
 64
 933265        return buffer.Length > 0 ? PerformReadAsync() :
 933266            throw new ArgumentException($"The {nameof(buffer)} cannot be empty.", nameof(buffer));
 67
 68        async ValueTask<int> PerformReadAsync()
 933269        {
 70            try
 933271            {
 933272                return SslStream is SslStream sslStream ?
 933273                    await SslStream.ReadAsync(buffer, cancellationToken).ConfigureAwait(false) :
 933274                    await Socket.ReceiveAsync(buffer, SocketFlags.None, cancellationToken).ConfigureAwait(false);
 75            }
 676            catch (IOException exception)
 677            {
 678                throw exception.ToIceRpcException();
 79            }
 1380            catch (SocketException exception)
 1381            {
 1382                throw exception.ToIceRpcException();
 83            }
 929284        }
 933285    }
 86
 87    public Task ShutdownWriteAsync(CancellationToken cancellationToken)
 2588    {
 2589        ObjectDisposedException.ThrowIf(_isDisposed, this);
 90
 2591        return PerformShutdownAsync();
 92
 93        async Task PerformShutdownAsync()
 2594        {
 95            try
 2596            {
 2597                if (SslStream is SslStream sslStream)
 798                {
 799                    Task shutdownTask = sslStream.ShutdownAsync();
 100
 101                    try
 7102                    {
 7103                        await shutdownTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 7104                    }
 0105                    catch (OperationCanceledException)
 0106                    {
 0107                        await AbortAndObserveAsync(shutdownTask).ConfigureAwait(false);
 0108                        throw;
 109                    }
 110                }
 111
 112                // Shutdown the socket send side to send a TCP FIN packet. We don't close the read side because we want
 113                // to be notified when the peer shuts down it's side of the socket (through the ReceiveAsync call).
 25114                Socket.Shutdown(SocketShutdown.Send);
 115
 116                // If shutdown is successful mark the connection as shutdown to ensure Dispose won't reset the TCP
 117                // connection.
 25118                _isShutdown = true;
 119            }
 0120            catch (IOException exception)
 121            {
 0122                throw exception.ToIceRpcException();
 123            }
 0124            catch (SocketException exception)
 125            {
 0126                throw exception.ToIceRpcException();
 127            }
 128        }
 50129    }
 130
 131    public ValueTask WriteAsync(ReadOnlySequence<byte> buffer, CancellationToken cancellationToken)
 99132    {
 99133        ObjectDisposedException.ThrowIf(_isDisposed, this);
 99134        return PerformWriteAsync();
 135
 136        async ValueTask PerformWriteAsync()
 99137        {
 138            try
 99139            {
 99140                if (SslStream is SslStream sslStream)
 14141                {
 14142                    if (buffer.IsSingleSegment)
 12143                    {
 12144                        await sslStream.WriteAsync(buffer.First, cancellationToken).ConfigureAwait(false);
 8145                    }
 146                    else
 2147                    {
 148                        // Coalesce leading segments up to _maxSslBufferSize. We don't coalesce trailing segments as we
 149                        // assume these segments are large enough.
 2150                        int leadingSize = 0;
 2151                        int leadingSegmentCount = 0;
 304152                        foreach (ReadOnlyMemory<byte> memory in buffer)
 149153                        {
 149154                            if (leadingSize + memory.Length <= _maxSslBufferSize)
 149155                            {
 149156                                leadingSize += memory.Length;
 149157                                leadingSegmentCount++;
 149158                            }
 159                            else
 0160                            {
 0161                                break;
 162                            }
 149163                        }
 164
 2165                        if (leadingSegmentCount > 1)
 2166                        {
 2167                            ReadOnlySequence<byte> leading = buffer.Slice(0, leadingSize);
 2168                            buffer = buffer.Slice(leadingSize); // buffer can become empty
 169
 2170                            Debug.Assert(_writeBufferOwner is not null);
 2171                            Memory<byte> writeBuffer = _writeBufferOwner.Memory[0..leadingSize];
 2172                            leading.CopyTo(writeBuffer.Span);
 173
 174                            // Send the "coalesced" leading segments
 2175                            await sslStream.WriteAsync(writeBuffer, cancellationToken).ConfigureAwait(false);
 2176                        }
 177                        // else no need to coalesce (copy) a single segment
 178
 179                        // Send the remaining segments one by one
 2180                        if (buffer.IsEmpty)
 2181                        {
 182                            // done
 2183                        }
 0184                        else if (buffer.IsSingleSegment)
 0185                        {
 0186                            await sslStream.WriteAsync(buffer.First, cancellationToken).ConfigureAwait(false);
 0187                        }
 188                        else
 0189                        {
 0190                            foreach (ReadOnlyMemory<byte> memory in buffer)
 0191                            {
 0192                                await sslStream.WriteAsync(memory, cancellationToken).ConfigureAwait(false);
 0193                            }
 0194                        }
 2195                    }
 10196                }
 197                else
 85198                {
 85199                    if (buffer.IsSingleSegment)
 81200                    {
 81201                        _ = await Socket.SendAsync(buffer.First, SocketFlags.None, cancellationToken)
 81202                            .ConfigureAwait(false);
 73203                    }
 204                    else
 4205                    {
 4206                        _segments.Clear();
 608207                        foreach (ReadOnlyMemory<byte> memory in buffer)
 298208                        {
 298209                            if (MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> segment))
 298210                            {
 298211                                _segments.Add(segment);
 298212                            }
 213                            else
 0214                            {
 0215                                throw new ArgumentException(
 0216                                    $"The {nameof(buffer)} must be backed by arrays.",
 0217                                    nameof(buffer));
 218                            }
 298219                        }
 220
 4221                        Task sendTask = Socket.SendAsync(_segments, SocketFlags.None);
 222
 223                        try
 4224                        {
 4225                            await sendTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 4226                        }
 0227                        catch (OperationCanceledException)
 0228                        {
 0229                            await AbortAndObserveAsync(sendTask).ConfigureAwait(false);
 0230                            throw;
 231                        }
 232                    }
 233                }
 234            }
 1235            catch (IOException exception)
 236            {
 1237                throw exception.ToIceRpcException();
 238            }
 4239            catch (SocketException exception)
 240            {
 4241                throw exception.ToIceRpcException();
 242            }
 243        }
 186244    }
 245
 218246    private protected TcpConnection(IMemoryOwner<byte>? memoryOwner)
 218247    {
 218248        _writeBufferOwner = memoryOwner;
 249        // When coalescing leading buffers in WriteAsync (SSL only), the upper size limit is the lesser of the size of
 250        // the buffer we rented from the memory pool (typically 4K) and MaxSslDataSize (16K).
 218251        _maxSslBufferSize = Math.Min(memoryOwner?.Memory.Length ?? 0, MaxSslDataSize);
 218252    }
 253
 254    private protected abstract Task<TransportConnectionInformation> ConnectAsyncCore(
 255        CancellationToken cancellationToken);
 256
 257    /// <summary>Aborts the connection and then observes the exception of the provided task.</summary>
 258    private async Task AbortAndObserveAsync(Task task)
 0259    {
 0260        Socket.Close(0);
 261        try
 0262        {
 0263            await task.ConfigureAwait(false);
 0264        }
 0265        catch
 0266        {
 267            // observe exception
 0268        }
 0269    }
 270}
 271
 272internal class TcpClientConnection : TcpConnection
 273{
 274    internal override Socket Socket { get; }
 275
 276    internal override SslStream? SslStream => _sslStream;
 277
 278    private readonly EndPoint _addr;
 279    private readonly SslClientAuthenticationOptions? _authenticationOptions;
 280
 281    private SslStream? _sslStream;
 282
 283    internal TcpClientConnection(
 284        ServerAddress serverAddress,
 285        SslClientAuthenticationOptions? authenticationOptions,
 286        MemoryPool<byte> pool,
 287        int minimumSegmentSize,
 288        TcpClientTransportOptions options)
 289        : base(authenticationOptions is not null ? pool.Rent(minimumSegmentSize) : null)
 290    {
 291        _addr = IPAddress.TryParse(serverAddress.Host, out IPAddress? ipAddress) ?
 292            new IPEndPoint(ipAddress, serverAddress.Port) :
 293            new DnsEndPoint(serverAddress.Host, serverAddress.Port);
 294
 295        _authenticationOptions = authenticationOptions;
 296
 297        // When using IPv6 address family we use the socket constructor without AddressFamily parameter to ensure
 298        // dual-mode socket are used in platforms that support them.
 299        Socket = ipAddress?.AddressFamily == AddressFamily.InterNetwork ?
 300            new Socket(ipAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp) :
 301            new Socket(SocketType.Stream, ProtocolType.Tcp);
 302
 303        try
 304        {
 305            if (options.LocalNetworkAddress is IPEndPoint localNetworkAddress)
 306            {
 307                Socket.Bind(localNetworkAddress);
 308            }
 309
 310            Socket.Configure(options);
 311        }
 312        catch (SocketException exception)
 313        {
 314            Socket.Dispose();
 315            throw exception.ToIceRpcException();
 316        }
 317        catch
 318        {
 319            Socket.Dispose();
 320            throw;
 321        }
 322    }
 323
 324    private protected override async Task<TransportConnectionInformation> ConnectAsyncCore(
 325        CancellationToken cancellationToken)
 326    {
 327        try
 328        {
 329            Debug.Assert(Socket is not null);
 330
 331            // Connect to the peer.
 332            await Socket.ConnectAsync(_addr, cancellationToken).ConfigureAwait(false);
 333
 334            // Workaround: a canceled Socket.ConnectAsync call can return successfully but the Socket is closed because
 335            // of the cancellation. See https://github.com/dotnet/runtime/issues/75889.
 336            cancellationToken.ThrowIfCancellationRequested();
 337
 338            if (_authenticationOptions is not null)
 339            {
 340                _sslStream = new SslStream(new NetworkStream(Socket, false), false);
 341
 342                await _sslStream.AuthenticateAsClientAsync(
 343                    _authenticationOptions,
 344                    cancellationToken).ConfigureAwait(false);
 345            }
 346
 347            return new TransportConnectionInformation(
 348                localNetworkAddress: Socket.LocalEndPoint!,
 349                remoteNetworkAddress: Socket.RemoteEndPoint!,
 350                _sslStream?.RemoteCertificate);
 351        }
 352        catch (IOException exception)
 353        {
 354            throw exception.ToIceRpcException();
 355        }
 356        catch (SocketException exception)
 357        {
 358            throw exception.ToIceRpcException();
 359        }
 360    }
 361}
 362
 363internal class TcpServerConnection : TcpConnection
 364{
 365    internal override Socket Socket { get; }
 366
 367    internal override SslStream? SslStream => _sslStream;
 368
 369    private readonly SslServerAuthenticationOptions? _authenticationOptions;
 370    private SslStream? _sslStream;
 371
 372    internal TcpServerConnection(
 373        Socket socket,
 374        SslServerAuthenticationOptions? authenticationOptions,
 375        MemoryPool<byte> pool,
 376        int minimumSegmentSize)
 377        : base(authenticationOptions is not null ? pool.Rent(minimumSegmentSize) : null)
 378    {
 379        Socket = socket;
 380        _authenticationOptions = authenticationOptions;
 381    }
 382
 383    private protected override async Task<TransportConnectionInformation> ConnectAsyncCore(
 384        CancellationToken cancellationToken)
 385    {
 386        try
 387        {
 388            if (_authenticationOptions is not null)
 389            {
 390                // This can only be created with a connected socket.
 391                _sslStream = new SslStream(new NetworkStream(Socket, false), false);
 392                await _sslStream.AuthenticateAsServerAsync(
 393                    _authenticationOptions,
 394                    cancellationToken).ConfigureAwait(false);
 395            }
 396
 397            return new TransportConnectionInformation(
 398                localNetworkAddress: Socket.LocalEndPoint!,
 399                remoteNetworkAddress: Socket.RemoteEndPoint!,
 400                _sslStream?.RemoteCertificate);
 401        }
 402        catch (IOException exception)
 403        {
 404            throw exception.ToIceRpcException();
 405        }
 406        catch (SocketException exception)
 407        {
 408            throw exception.ToIceRpcException();
 409        }
 410    }
 411}