< Summary

Information
Class: IceRpc.Transports.Coloc.Internal.ColocConnection
Assembly: IceRpc.Transports.Coloc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Transports.Coloc/Internal/ColocConnection.cs
Tag: 278_19370051549
Line coverage
82%
Covered lines: 110
Uncovered lines: 23
Coverable lines: 133
Total lines: 274
Line coverage: 82.7%
Branch coverage
81%
Covered branches: 39
Total branches: 48
Branch coverage: 81.2%
Method coverage
100%
Covered methods: 8
Total methods: 8
Method coverage: 100%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
Dispose()100%11100%
ReadAsync()88.88%18.631887.5%
CopySegmentToMemory()100%22100%
ShutdownWriteAsync(...)83.33%6.11685.71%
WriteAsync()75%13.831276.66%
.ctor(...)100%11100%
Dispose(...)75%9.3872.72%
FinishConnect()50%2.15266.66%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Transports.Coloc/Internal/ColocConnection.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Internal;
 4using IceRpc.Transports.Internal;
 5using System.Buffers;
 6using System.Diagnostics;
 7using System.IO.Pipelines;
 8
 9namespace IceRpc.Transports.Coloc.Internal;
 10
 11/// <summary>The colocated connection class to exchange data within the same process. The implementation copies the send
 12/// buffer into the receive buffer.</summary>
 13internal abstract class ColocConnection : IDuplexConnection
 14{
 15    private protected PipeReader? _reader;
 16    // FlagEnumExtensions operations are used to update the state. These operations are atomic and don't require mutex
 17    // locking.
 18    private protected int _state;
 19
 20    private readonly ServerAddress _serverAddress;
 21    private readonly PipeWriter _writer;
 22
 23    public abstract Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken);
 24
 25    public void Dispose()
 225226    {
 225227        Dispose(true);
 225228        GC.SuppressFinalize(this);
 225229    }
 30
 31    public async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken)
 12002932    {
 12002933        ObjectDisposedException.ThrowIf(_state.HasFlag(State.Disposed), this);
 34
 12002735        if (_reader is null)
 236        {
 237            throw new InvalidOperationException("Reading is not allowed before connection is connected.");
 38        }
 12002539        if (!_state.TrySetFlag(State.Reading))
 240        {
 241            throw new InvalidOperationException("Reading is already in progress.");
 42        }
 43
 44        try
 12002345        {
 12002346            ReadResult readResult = await _reader.ReadAsync(cancellationToken).ConfigureAwait(false);
 11881247            if (readResult.IsCanceled)
 048            {
 49                // Dispose canceled ReadAsync.
 050                throw new IceRpcException(IceRpcError.OperationAborted);
 51            }
 11881252            else if (readResult.IsCompleted && readResult.Buffer.IsEmpty)
 26253            {
 26254                return 0;
 55            }
 56
 57            // We could eventually add a CopyTo(this ReadOnlySequence<byte> src, Memory<byte> dest) extension method
 58            // if we need this in other places.
 59            int read;
 11855060            if (readResult.Buffer.IsSingleSegment)
 1096661            {
 1096662                read = CopySegmentToMemory(readResult.Buffer.First, buffer);
 1096663            }
 64            else
 10758465            {
 10758466                read = 0;
 60257667                foreach (ReadOnlyMemory<byte> segment in readResult.Buffer)
 19370468                {
 19370469                    read += CopySegmentToMemory(segment, buffer[read..]);
 19370470                    if (read == buffer.Length)
 10758471                    {
 10758472                        break;
 73                    }
 8612074                }
 10758475            }
 11855076            _reader.AdvanceTo(readResult.Buffer.GetPosition(read));
 11855077            return read;
 78        }
 79        finally
 12002380        {
 12002381            if (_state.HasFlag(State.Disposed))
 082            {
 083                _reader.Complete(new IceRpcException(IceRpcError.ConnectionAborted));
 084            }
 12002385            _state.ClearFlag(State.Reading);
 12002386        }
 87
 88        static int CopySegmentToMemory(ReadOnlyMemory<byte> source, Memory<byte> destination)
 20467089        {
 20467090            if (source.Length > destination.Length)
 10274591            {
 10274592                source[0..destination.Length].CopyTo(destination);
 10274593                return destination.Length;
 94            }
 95            else
 10192596            {
 10192597                source.CopyTo(destination);
 10192598                return source.Length;
 99            }
 204670100        }
 118812101    }
 102
 103    public Task ShutdownWriteAsync(CancellationToken cancellationToken)
 282104    {
 282105        ObjectDisposedException.ThrowIf(_state.HasFlag(State.Disposed), this);
 106
 282107        if (_reader is null)
 2108        {
 2109            throw new InvalidOperationException("Shutdown is not allowed before the connection is connected.");
 110        }
 280111        if (_state.HasFlag(State.Writing))
 2112        {
 2113            throw new InvalidOperationException("Shutdown or writing is in progress");
 114        }
 278115        if (!_state.TrySetFlag(State.ShuttingDown))
 0116        {
 0117            throw new InvalidOperationException("Shutdown has already been called.");
 118        }
 119
 278120        _writer.Complete();
 278121        return Task.CompletedTask;
 278122    }
 123
 124    public async ValueTask WriteAsync(ReadOnlySequence<byte> buffer, CancellationToken cancellationToken)
 14559125    {
 14559126        ObjectDisposedException.ThrowIf(_state.HasFlag(State.Disposed), this);
 127
 14557128        if (buffer.IsEmpty)
 0129        {
 0130            throw new ArgumentException($"The {nameof(buffer)} cannot be empty.", nameof(buffer));
 131        }
 132
 14557133        if (_reader is null)
 2134        {
 2135            throw new InvalidOperationException("Writing is not allowed before the connection is connected.");
 136        }
 14555137        if (_state.HasFlag(State.ShuttingDown))
 4138        {
 4139            throw new InvalidOperationException("Writing is not allowed after the connection is shutdown.");
 140        }
 14551141        if (!_state.TrySetFlag(State.Writing))
 2142        {
 2143            throw new InvalidOperationException("Writing is already in progress.");
 144        }
 145
 146        try
 14549147        {
 14549148            _writer.Write(buffer);
 14549149            FlushResult flushResult = await _writer.FlushAsync(cancellationToken).ConfigureAwait(false);
 14531150            if (flushResult.IsCanceled)
 0151            {
 152                // Dispose canceled ReadAsync.
 0153                throw new IceRpcException(IceRpcError.OperationAborted);
 154            }
 14531155        }
 156        finally
 14549157        {
 14549158            Debug.Assert(!_state.HasFlag(State.ShuttingDown));
 14549159            if (_state.HasFlag(State.Disposed))
 0160            {
 0161                _writer.Complete(new IceRpcException(IceRpcError.ConnectionAborted));
 0162            }
 14549163            _state.ClearFlag(State.Writing);
 14549164        }
 14531165    }
 166
 1900167    public ColocConnection(ServerAddress serverAddress, PipeWriter writer)
 1900168    {
 1900169        _serverAddress = serverAddress;
 1900170        _writer = writer;
 1900171    }
 172
 173    private protected virtual void Dispose(bool disposing)
 2252174    {
 2252175        if (_state.TrySetFlag(State.Disposed))
 1898176        {
 177            // _reader can be null if connection establishment failed or didn't run.
 1898178            if (_reader is not null)
 1786179            {
 1786180                if (_state.HasFlag(State.Reading))
 0181                {
 0182                    _reader.CancelPendingRead();
 0183                }
 184                else
 1786185                {
 1786186                    _reader.Complete(new IceRpcException(IceRpcError.ConnectionAborted));
 1786187                }
 1786188            }
 189
 1898190            if (_state.HasFlag(State.Writing))
 0191            {
 0192                _writer.CancelPendingFlush();
 0193            }
 194            else
 1898195            {
 1898196                _writer.Complete(new IceRpcException(IceRpcError.ConnectionAborted));
 1898197            }
 1898198        }
 2252199    }
 200
 201    private protected TransportConnectionInformation FinishConnect()
 1734202    {
 1734203        Debug.Assert(_reader is not null);
 204
 1734205        if (_state.HasFlag(State.Disposed))
 0206        {
 0207            _reader.Complete();
 0208            throw new ObjectDisposedException($"{typeof(ColocConnection)}");
 209        }
 210
 1734211        var colocEndPoint = new ColocEndPoint(_serverAddress);
 1734212        return new TransportConnectionInformation(colocEndPoint, colocEndPoint, null);
 1734213    }
 214
 215    private protected enum State : int
 216    {
 217        Disposed = 1,
 218        Reading = 2,
 219        ShuttingDown = 4,
 220        Writing = 8,
 221    }
 222}
 223
 224/// <summary>The colocated client connection class.</summary>
 225internal class ClientColocConnection : ColocConnection
 226{
 227    private readonly Func<PipeReader, CancellationToken, Task<PipeReader>> _connectAsync;
 228    private PipeReader? _localPipeReader;
 229
 230    public override async Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken)
 231    {
 232        ObjectDisposedException.ThrowIf(_state.HasFlag(State.Disposed), this);
 233
 234        if (_reader is not null)
 235        {
 236            throw new InvalidOperationException("Connection establishment cannot be called twice.");
 237        }
 238
 239        Debug.Assert(!_state.HasFlag(State.ShuttingDown));
 240
 241        if (_localPipeReader is not null)
 242        {
 243            _reader = await _connectAsync(_localPipeReader, cancellationToken).ConfigureAwait(false);
 244            _localPipeReader = null; // The server-side connection is now responsible for completing the pipe reader.
 245        }
 246        return FinishConnect();
 247    }
 248
 249    internal ClientColocConnection(
 250        ServerAddress serverAddress,
 251        Pipe localPipe,
 252        Func<PipeReader, CancellationToken, Task<PipeReader>> connectAsync)
 253        : base(serverAddress, localPipe.Writer)
 254    {
 255        _connectAsync = connectAsync;
 256        _localPipeReader = localPipe.Reader;
 257    }
 258
 259    private protected override void Dispose(bool disposing)
 260    {
 261        base.Dispose(disposing);
 262        _localPipeReader?.Complete();
 263    }
 264}
 265
 266/// <summary>The colocated server connection class.</summary>
 267internal class ServerColocConnection : ColocConnection
 268{
 269    public override Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken) =>
 270        Task.FromResult(FinishConnect());
 271
 272    public ServerColocConnection(ServerAddress serverAddress, PipeWriter writer, PipeReader reader)
 273       : base(serverAddress, writer) => _reader = reader;
 274}