< Summary

Information
Class: IceRpc.Transports.Coloc.Internal.ClientColocConnection
Assembly: IceRpc.Transports.Coloc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Transports.Coloc/Internal/ColocConnection.cs
Tag: 1321_24790053727
Line coverage
100%
Covered lines: 22
Uncovered lines: 0
Coverable lines: 22
Total lines: 279
Line coverage: 100%
Branch coverage
100%
Covered branches: 6
Total branches: 6
Branch coverage: 100%
Method coverage
100%
Covered methods: 3
Fully covered methods: 3
Total methods: 3
Method coverage: 100%
Full method coverage: 100%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
ConnectAsync()100%44100%
.ctor(...)100%11100%
Dispose(...)100%22100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Transports.Coloc/Internal/ColocConnection.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Internal;
 4using IceRpc.Transports.Internal;
 5using System.Buffers;
 6using System.Diagnostics;
 7using System.IO.Pipelines;
 8
 9namespace IceRpc.Transports.Coloc.Internal;
 10
 11/// <summary>The colocated connection class to exchange data within the same process. The implementation copies the send
 12/// buffer into the receive buffer.</summary>
 13internal abstract class ColocConnection : IDuplexConnection
 14{
 15    private protected PipeReader? _reader;
 16    // FlagEnumExtensions operations are used to update the state. These operations are atomic and don't require mutex
 17    // locking.
 18    private protected int _state;
 19
 20    private readonly TransportAddress _transportAddress;
 21    private readonly PipeWriter _writer;
 22
 23    public abstract Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken);
 24
 25    public void Dispose()
 26    {
 27        Dispose(true);
 28        GC.SuppressFinalize(this);
 29    }
 30
 31    public async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken)
 32    {
 33        ObjectDisposedException.ThrowIf(_state.HasFlag(State.Disposed), this);
 34
 35        if (buffer.Length == 0)
 36        {
 37            throw new ArgumentException($"The {nameof(buffer)} cannot be empty.", nameof(buffer));
 38        }
 39
 40        if (_reader is null)
 41        {
 42            throw new InvalidOperationException("Reading is not allowed before connection is connected.");
 43        }
 44        if (!_state.TrySetFlag(State.Reading))
 45        {
 46            throw new InvalidOperationException("Reading is already in progress.");
 47        }
 48
 49        try
 50        {
 51            ReadResult readResult = await _reader.ReadAsync(cancellationToken).ConfigureAwait(false);
 52            if (readResult.IsCanceled)
 53            {
 54                // Dispose canceled ReadAsync.
 55                throw new IceRpcException(IceRpcError.OperationAborted);
 56            }
 57            else if (readResult.IsCompleted && readResult.Buffer.IsEmpty)
 58            {
 59                return 0;
 60            }
 61
 62            // We could eventually add a CopyTo(this ReadOnlySequence<byte> src, Memory<byte> dest) extension method
 63            // if we need this in other places.
 64            int read;
 65            if (readResult.Buffer.IsSingleSegment)
 66            {
 67                read = CopySegmentToMemory(readResult.Buffer.First, buffer);
 68            }
 69            else
 70            {
 71                read = 0;
 72                foreach (ReadOnlyMemory<byte> segment in readResult.Buffer)
 73                {
 74                    read += CopySegmentToMemory(segment, buffer[read..]);
 75                    if (read == buffer.Length)
 76                    {
 77                        break;
 78                    }
 79                }
 80            }
 81            _reader.AdvanceTo(readResult.Buffer.GetPosition(read));
 82            return read;
 83        }
 84        finally
 85        {
 86            if (_state.HasFlag(State.Disposed))
 87            {
 88                _reader.Complete(new IceRpcException(IceRpcError.ConnectionAborted));
 89            }
 90            _state.ClearFlag(State.Reading);
 91        }
 92
 93        static int CopySegmentToMemory(ReadOnlyMemory<byte> source, Memory<byte> destination)
 94        {
 95            if (source.Length > destination.Length)
 96            {
 97                source[0..destination.Length].CopyTo(destination);
 98                return destination.Length;
 99            }
 100            else
 101            {
 102                source.CopyTo(destination);
 103                return source.Length;
 104            }
 105        }
 106    }
 107
 108    public Task ShutdownWriteAsync(CancellationToken cancellationToken)
 109    {
 110        ObjectDisposedException.ThrowIf(_state.HasFlag(State.Disposed), this);
 111
 112        if (_reader is null)
 113        {
 114            throw new InvalidOperationException("Shutdown is not allowed before the connection is connected.");
 115        }
 116        if (_state.HasFlag(State.Writing))
 117        {
 118            throw new InvalidOperationException("Shutdown or writing is in progress");
 119        }
 120        if (!_state.TrySetFlag(State.ShuttingDown))
 121        {
 122            throw new InvalidOperationException("Shutdown has already been called.");
 123        }
 124
 125        _writer.Complete();
 126        return Task.CompletedTask;
 127    }
 128
 129    public async ValueTask WriteAsync(ReadOnlySequence<byte> buffer, CancellationToken cancellationToken)
 130    {
 131        ObjectDisposedException.ThrowIf(_state.HasFlag(State.Disposed), this);
 132
 133        if (buffer.IsEmpty)
 134        {
 135            throw new ArgumentException($"The {nameof(buffer)} cannot be empty.", nameof(buffer));
 136        }
 137
 138        if (_reader is null)
 139        {
 140            throw new InvalidOperationException("Writing is not allowed before the connection is connected.");
 141        }
 142        if (_state.HasFlag(State.ShuttingDown))
 143        {
 144            throw new InvalidOperationException("Writing is not allowed after the connection is shutdown.");
 145        }
 146        if (!_state.TrySetFlag(State.Writing))
 147        {
 148            throw new InvalidOperationException("Writing is already in progress.");
 149        }
 150
 151        try
 152        {
 153            _writer.Write(buffer);
 154            FlushResult flushResult = await _writer.FlushAsync(cancellationToken).ConfigureAwait(false);
 155            if (flushResult.IsCanceled)
 156            {
 157                // Dispose canceled ReadAsync.
 158                throw new IceRpcException(IceRpcError.OperationAborted);
 159            }
 160        }
 161        finally
 162        {
 163            Debug.Assert(!_state.HasFlag(State.ShuttingDown));
 164            if (_state.HasFlag(State.Disposed))
 165            {
 166                _writer.Complete(new IceRpcException(IceRpcError.ConnectionAborted));
 167            }
 168            _state.ClearFlag(State.Writing);
 169        }
 170    }
 171
 172    public ColocConnection(TransportAddress transportAddress, PipeWriter writer)
 173    {
 174        _transportAddress = transportAddress;
 175        _writer = writer;
 176    }
 177
 178    private protected virtual void Dispose(bool disposing)
 179    {
 180        if (_state.TrySetFlag(State.Disposed))
 181        {
 182            // _reader can be null if connection establishment failed or didn't run.
 183            if (_reader is not null)
 184            {
 185                if (_state.HasFlag(State.Reading))
 186                {
 187                    _reader.CancelPendingRead();
 188                }
 189                else
 190                {
 191                    _reader.Complete(new IceRpcException(IceRpcError.ConnectionAborted));
 192                }
 193            }
 194
 195            if (_state.HasFlag(State.Writing))
 196            {
 197                _writer.CancelPendingFlush();
 198            }
 199            else
 200            {
 201                _writer.Complete(new IceRpcException(IceRpcError.ConnectionAborted));
 202            }
 203        }
 204    }
 205
 206    private protected TransportConnectionInformation FinishConnect()
 207    {
 208        Debug.Assert(_reader is not null);
 209
 210        if (_state.HasFlag(State.Disposed))
 211        {
 212            _reader.Complete();
 213            throw new ObjectDisposedException($"{typeof(ColocConnection)}");
 214        }
 215
 216        var colocEndPoint = new ColocEndPoint(_transportAddress);
 217        return new TransportConnectionInformation(colocEndPoint, colocEndPoint, null);
 218    }
 219
 220    private protected enum State : int
 221    {
 222        Disposed = 1,
 223        Reading = 2,
 224        ShuttingDown = 4,
 225        Writing = 8,
 226    }
 227}
 228
 229/// <summary>The colocated client connection class.</summary>
 230internal class ClientColocConnection : ColocConnection
 231{
 232    private readonly Func<PipeReader, CancellationToken, Task<PipeReader>> _connectAsync;
 233    private PipeReader? _localPipeReader;
 234
 235    public override async Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken)
 497236    {
 497237        ObjectDisposedException.ThrowIf(_state.HasFlag(State.Disposed), this);
 238
 497239        if (_reader is not null)
 1240        {
 1241            throw new InvalidOperationException("Connection establishment cannot be called twice.");
 242        }
 243
 496244        Debug.Assert(!_state.HasFlag(State.ShuttingDown));
 245
 496246        if (_localPipeReader is not null)
 496247        {
 496248            _reader = await _connectAsync(_localPipeReader, cancellationToken).ConfigureAwait(false);
 468249            _localPipeReader = null; // The server-side connection is now responsible for completing the pipe reader.
 468250        }
 468251        return FinishConnect();
 468252    }
 253
 254    internal ClientColocConnection(
 255        TransportAddress transportAddress,
 256        Pipe localPipe,
 257        Func<PipeReader, CancellationToken, Task<PipeReader>> connectAsync)
 525258        : base(transportAddress, localPipe.Writer)
 525259    {
 525260        _connectAsync = connectAsync;
 525261        _localPipeReader = localPipe.Reader;
 525262    }
 263
 264    private protected override void Dispose(bool disposing)
 628265    {
 628266        base.Dispose(disposing);
 628267        _localPipeReader?.Complete();
 628268    }
 269}
 270
 271/// <summary>The colocated server connection class.</summary>
 272internal class ServerColocConnection : ColocConnection
 273{
 274    public override Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken) =>
 275        Task.FromResult(FinishConnect());
 276
 277    public ServerColocConnection(TransportAddress transportAddress, PipeWriter writer, PipeReader reader)
 278       : base(transportAddress, writer) => _reader = reader;
 279}