< Summary

Information
Class: IceRpc.Transports.Coloc.Internal.ClientColocConnection
Assembly: IceRpc.Transports.Coloc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Transports.Coloc/Internal/ColocConnection.cs
Tag: 275_13775359185
Line coverage
100%
Covered lines: 22
Uncovered lines: 0
Coverable lines: 22
Total lines: 274
Line coverage: 100%
Branch coverage
100%
Covered branches: 6
Total branches: 6
Branch coverage: 100%
Method coverage
100%
Covered methods: 3
Total methods: 3
Method coverage: 100%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
ConnectAsync()100%44100%
.ctor(...)100%11100%
Dispose(...)100%22100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Transports.Coloc/Internal/ColocConnection.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Internal;
 4using IceRpc.Transports.Internal;
 5using System.Buffers;
 6using System.Diagnostics;
 7using System.IO.Pipelines;
 8
 9namespace IceRpc.Transports.Coloc.Internal;
 10
 11/// <summary>The colocated connection class to exchange data within the same process. The implementation copies the send
 12/// buffer into the receive buffer.</summary>
 13internal abstract class ColocConnection : IDuplexConnection
 14{
 15    private protected PipeReader? _reader;
 16    // FlagEnumExtensions operations are used to update the state. These operations are atomic and don't require mutex
 17    // locking.
 18    private protected int _state;
 19
 20    private readonly ServerAddress _serverAddress;
 21    private readonly PipeWriter _writer;
 22
 23    public abstract Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken);
 24
 25    public void Dispose()
 26    {
 27        Dispose(true);
 28        GC.SuppressFinalize(this);
 29    }
 30
 31    public async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken)
 32    {
 33        ObjectDisposedException.ThrowIf(_state.HasFlag(State.Disposed), this);
 34
 35        if (_reader is null)
 36        {
 37            throw new InvalidOperationException("Reading is not allowed before connection is connected.");
 38        }
 39        if (!_state.TrySetFlag(State.Reading))
 40        {
 41            throw new InvalidOperationException("Reading is already in progress.");
 42        }
 43
 44        try
 45        {
 46            ReadResult readResult = await _reader.ReadAsync(cancellationToken).ConfigureAwait(false);
 47            if (readResult.IsCanceled)
 48            {
 49                // Dispose canceled ReadAsync.
 50                throw new IceRpcException(IceRpcError.OperationAborted);
 51            }
 52            else if (readResult.IsCompleted && readResult.Buffer.IsEmpty)
 53            {
 54                return 0;
 55            }
 56
 57            // We could eventually add a CopyTo(this ReadOnlySequence<byte> src, Memory<byte> dest) extension method
 58            // if we need this in other places.
 59            int read;
 60            if (readResult.Buffer.IsSingleSegment)
 61            {
 62                read = CopySegmentToMemory(readResult.Buffer.First, buffer);
 63            }
 64            else
 65            {
 66                read = 0;
 67                foreach (ReadOnlyMemory<byte> segment in readResult.Buffer)
 68                {
 69                    read += CopySegmentToMemory(segment, buffer[read..]);
 70                    if (read == buffer.Length)
 71                    {
 72                        break;
 73                    }
 74                }
 75            }
 76            _reader.AdvanceTo(readResult.Buffer.GetPosition(read));
 77            return read;
 78        }
 79        finally
 80        {
 81            if (_state.HasFlag(State.Disposed))
 82            {
 83                _reader.Complete(new IceRpcException(IceRpcError.ConnectionAborted));
 84            }
 85            _state.ClearFlag(State.Reading);
 86        }
 87
 88        static int CopySegmentToMemory(ReadOnlyMemory<byte> source, Memory<byte> destination)
 89        {
 90            if (source.Length > destination.Length)
 91            {
 92                source[0..destination.Length].CopyTo(destination);
 93                return destination.Length;
 94            }
 95            else
 96            {
 97                source.CopyTo(destination);
 98                return source.Length;
 99            }
 100        }
 101    }
 102
 103    public Task ShutdownWriteAsync(CancellationToken cancellationToken)
 104    {
 105        ObjectDisposedException.ThrowIf(_state.HasFlag(State.Disposed), this);
 106
 107        if (_reader is null)
 108        {
 109            throw new InvalidOperationException("Shutdown is not allowed before the connection is connected.");
 110        }
 111        if (_state.HasFlag(State.Writing))
 112        {
 113            throw new InvalidOperationException("Shutdown or writing is in progress");
 114        }
 115        if (!_state.TrySetFlag(State.ShuttingDown))
 116        {
 117            throw new InvalidOperationException("Shutdown has already been called.");
 118        }
 119
 120        _writer.Complete();
 121        return Task.CompletedTask;
 122    }
 123
 124    public async ValueTask WriteAsync(ReadOnlySequence<byte> buffer, CancellationToken cancellationToken)
 125    {
 126        ObjectDisposedException.ThrowIf(_state.HasFlag(State.Disposed), this);
 127
 128        if (buffer.IsEmpty)
 129        {
 130            throw new ArgumentException($"The {nameof(buffer)} cannot be empty.", nameof(buffer));
 131        }
 132
 133        if (_reader is null)
 134        {
 135            throw new InvalidOperationException("Writing is not allowed before the connection is connected.");
 136        }
 137        if (_state.HasFlag(State.ShuttingDown))
 138        {
 139            throw new InvalidOperationException("Writing is not allowed after the connection is shutdown.");
 140        }
 141        if (!_state.TrySetFlag(State.Writing))
 142        {
 143            throw new InvalidOperationException("Writing is already in progress.");
 144        }
 145
 146        try
 147        {
 148            _writer.Write(buffer);
 149            FlushResult flushResult = await _writer.FlushAsync(cancellationToken).ConfigureAwait(false);
 150            if (flushResult.IsCanceled)
 151            {
 152                // Dispose canceled ReadAsync.
 153                throw new IceRpcException(IceRpcError.OperationAborted);
 154            }
 155        }
 156        finally
 157        {
 158            Debug.Assert(!_state.HasFlag(State.ShuttingDown));
 159            if (_state.HasFlag(State.Disposed))
 160            {
 161                _writer.Complete(new IceRpcException(IceRpcError.ConnectionAborted));
 162            }
 163            _state.ClearFlag(State.Writing);
 164        }
 165    }
 166
 167    public ColocConnection(ServerAddress serverAddress, PipeWriter writer)
 168    {
 169        _serverAddress = serverAddress;
 170        _writer = writer;
 171    }
 172
 173    private protected virtual void Dispose(bool disposing)
 174    {
 175        if (_state.TrySetFlag(State.Disposed))
 176        {
 177            // _reader can be null if connection establishment failed or didn't run.
 178            if (_reader is not null)
 179            {
 180                if (_state.HasFlag(State.Reading))
 181                {
 182                    _reader.CancelPendingRead();
 183                }
 184                else
 185                {
 186                    _reader.Complete(new IceRpcException(IceRpcError.ConnectionAborted));
 187                }
 188            }
 189
 190            if (_state.HasFlag(State.Writing))
 191            {
 192                _writer.CancelPendingFlush();
 193            }
 194            else
 195            {
 196                _writer.Complete(new IceRpcException(IceRpcError.ConnectionAborted));
 197            }
 198        }
 199    }
 200
 201    private protected TransportConnectionInformation FinishConnect()
 202    {
 203        Debug.Assert(_reader is not null);
 204
 205        if (_state.HasFlag(State.Disposed))
 206        {
 207            _reader.Complete();
 208            throw new ObjectDisposedException($"{typeof(ColocConnection)}");
 209        }
 210
 211        var colocEndPoint = new ColocEndPoint(_serverAddress);
 212        return new TransportConnectionInformation(colocEndPoint, colocEndPoint, null);
 213    }
 214
 215    private protected enum State : int
 216    {
 217        Disposed = 1,
 218        Reading = 2,
 219        ShuttingDown = 4,
 220        Writing = 8,
 221    }
 222}
 223
 224/// <summary>The colocated client connection class.</summary>
 225internal class ClientColocConnection : ColocConnection
 226{
 227    private readonly Func<PipeReader, CancellationToken, Task<PipeReader>> _connectAsync;
 228    private PipeReader? _localPipeReader;
 229
 230    public override async Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken)
 951231    {
 951232        ObjectDisposedException.ThrowIf(_state.HasFlag(State.Disposed), this);
 233
 951234        if (_reader is not null)
 2235        {
 2236            throw new InvalidOperationException("Connection establishment cannot be called twice.");
 237        }
 238
 949239        Debug.Assert(!_state.HasFlag(State.ShuttingDown));
 240
 949241        if (_localPipeReader is not null)
 949242        {
 949243            _reader = await _connectAsync(_localPipeReader, cancellationToken).ConfigureAwait(false);
 892244            _localPipeReader = null; // The server-side connection is now responsible for completing the pipe reader.
 892245        }
 892246        return FinishConnect();
 892247    }
 248
 249    internal ClientColocConnection(
 250        ServerAddress serverAddress,
 251        Pipe localPipe,
 252        Func<PipeReader, CancellationToken, Task<PipeReader>> connectAsync)
 1007253        : base(serverAddress, localPipe.Writer)
 1007254    {
 1007255        _connectAsync = connectAsync;
 1007256        _localPipeReader = localPipe.Reader;
 1007257    }
 258
 259    private protected override void Dispose(bool disposing)
 1195260    {
 1195261        base.Dispose(disposing);
 1195262        _localPipeReader?.Complete();
 1195263    }
 264}
 265
 266/// <summary>The colocated server connection class.</summary>
 267internal class ServerColocConnection : ColocConnection
 268{
 269    public override Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken) =>
 270        Task.FromResult(FinishConnect());
 271
 272    public ServerColocConnection(ServerAddress serverAddress, PipeWriter writer, PipeReader reader)
 273       : base(serverAddress, writer) => _reader = reader;
 274}