< Summary

Information
Class: IceRpc.Transports.Coloc.Internal.ColocConnection
Assembly: IceRpc.Transports.Coloc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Transports.Coloc/Internal/ColocConnection.cs
Tag: 701_22528036593
Line coverage
82%
Covered lines: 110
Uncovered lines: 23
Coverable lines: 133
Total lines: 274
Line coverage: 82.7%
Branch coverage
81%
Covered branches: 39
Total branches: 48
Branch coverage: 81.2%
Method coverage
100%
Covered methods: 8
Total methods: 8
Method coverage: 100%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
Dispose()100%11100%
ReadAsync()88.88%18.631887.5%
CopySegmentToMemory()100%22100%
ShutdownWriteAsync(...)83.33%6.11685.71%
WriteAsync()75%13.831276.66%
.ctor(...)100%11100%
Dispose(...)75%9.3872.72%
FinishConnect()50%2.15266.66%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Transports.Coloc/Internal/ColocConnection.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Internal;
 4using IceRpc.Transports.Internal;
 5using System.Buffers;
 6using System.Diagnostics;
 7using System.IO.Pipelines;
 8
 9namespace IceRpc.Transports.Coloc.Internal;
 10
 11/// <summary>The colocated connection class to exchange data within the same process. The implementation copies the send
 12/// buffer into the receive buffer.</summary>
 13internal abstract class ColocConnection : IDuplexConnection
 14{
 15    private protected PipeReader? _reader;
 16    // FlagEnumExtensions operations are used to update the state. These operations are atomic and don't require mutex
 17    // locking.
 18    private protected int _state;
 19
 20    private readonly ServerAddress _serverAddress;
 21    private readonly PipeWriter _writer;
 22
 23    public abstract Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken);
 24
 25    public void Dispose()
 114426    {
 114427        Dispose(true);
 114428        GC.SuppressFinalize(this);
 114429    }
 30
 31    public async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken)
 6974832    {
 6974833        ObjectDisposedException.ThrowIf(_state.HasFlag(State.Disposed), this);
 34
 6974735        if (_reader is null)
 136        {
 137            throw new InvalidOperationException("Reading is not allowed before connection is connected.");
 38        }
 6974639        if (!_state.TrySetFlag(State.Reading))
 140        {
 141            throw new InvalidOperationException("Reading is already in progress.");
 42        }
 43
 44        try
 6974545        {
 6974546            ReadResult readResult = await _reader.ReadAsync(cancellationToken).ConfigureAwait(false);
 6913347            if (readResult.IsCanceled)
 048            {
 49                // Dispose canceled ReadAsync.
 050                throw new IceRpcException(IceRpcError.OperationAborted);
 51            }
 6913352            else if (readResult.IsCompleted && readResult.Buffer.IsEmpty)
 13953            {
 13954                return 0;
 55            }
 56
 57            // We could eventually add a CopyTo(this ReadOnlySequence<byte> src, Memory<byte> dest) extension method
 58            // if we need this in other places.
 59            int read;
 6899460            if (readResult.Buffer.IsSingleSegment)
 547861            {
 547862                read = CopySegmentToMemory(readResult.Buffer.First, buffer);
 547863            }
 64            else
 6351665            {
 6351666                read = 0;
 33962467                foreach (ReadOnlyMemory<byte> segment in readResult.Buffer)
 10629668                {
 10629669                    read += CopySegmentToMemory(segment, buffer[read..]);
 10629670                    if (read == buffer.Length)
 6351671                    {
 6351672                        break;
 73                    }
 4278074                }
 6351675            }
 6899476            _reader.AdvanceTo(readResult.Buffer.GetPosition(read));
 6899477            return read;
 78        }
 79        finally
 6974580        {
 6974581            if (_state.HasFlag(State.Disposed))
 082            {
 083                _reader.Complete(new IceRpcException(IceRpcError.ConnectionAborted));
 084            }
 6974585            _state.ClearFlag(State.Reading);
 6974586        }
 87
 88        static int CopySegmentToMemory(ReadOnlyMemory<byte> source, Memory<byte> destination)
 11177489        {
 11177490            if (source.Length > destination.Length)
 6141091            {
 6141092                source[0..destination.Length].CopyTo(destination);
 6141093                return destination.Length;
 94            }
 95            else
 5036496            {
 5036497                source.CopyTo(destination);
 5036498                return source.Length;
 99            }
 111774100        }
 69133101    }
 102
 103    public Task ShutdownWriteAsync(CancellationToken cancellationToken)
 148104    {
 148105        ObjectDisposedException.ThrowIf(_state.HasFlag(State.Disposed), this);
 106
 148107        if (_reader is null)
 1108        {
 1109            throw new InvalidOperationException("Shutdown is not allowed before the connection is connected.");
 110        }
 147111        if (_state.HasFlag(State.Writing))
 1112        {
 1113            throw new InvalidOperationException("Shutdown or writing is in progress");
 114        }
 146115        if (!_state.TrySetFlag(State.ShuttingDown))
 0116        {
 0117            throw new InvalidOperationException("Shutdown has already been called.");
 118        }
 119
 146120        _writer.Complete();
 146121        return Task.CompletedTask;
 146122    }
 123
 124    public async ValueTask WriteAsync(ReadOnlySequence<byte> buffer, CancellationToken cancellationToken)
 7442125    {
 7442126        ObjectDisposedException.ThrowIf(_state.HasFlag(State.Disposed), this);
 127
 7441128        if (buffer.IsEmpty)
 0129        {
 0130            throw new ArgumentException($"The {nameof(buffer)} cannot be empty.", nameof(buffer));
 131        }
 132
 7441133        if (_reader is null)
 1134        {
 1135            throw new InvalidOperationException("Writing is not allowed before the connection is connected.");
 136        }
 7440137        if (_state.HasFlag(State.ShuttingDown))
 2138        {
 2139            throw new InvalidOperationException("Writing is not allowed after the connection is shutdown.");
 140        }
 7438141        if (!_state.TrySetFlag(State.Writing))
 1142        {
 1143            throw new InvalidOperationException("Writing is already in progress.");
 144        }
 145
 146        try
 7437147        {
 7437148            _writer.Write(buffer);
 7437149            FlushResult flushResult = await _writer.FlushAsync(cancellationToken).ConfigureAwait(false);
 7427150            if (flushResult.IsCanceled)
 0151            {
 152                // Dispose canceled ReadAsync.
 0153                throw new IceRpcException(IceRpcError.OperationAborted);
 154            }
 7427155        }
 156        finally
 7437157        {
 7437158            Debug.Assert(!_state.HasFlag(State.ShuttingDown));
 7437159            if (_state.HasFlag(State.Disposed))
 0160            {
 0161                _writer.Complete(new IceRpcException(IceRpcError.ConnectionAborted));
 0162            }
 7437163            _state.ClearFlag(State.Writing);
 7437164        }
 7427165    }
 166
 967167    public ColocConnection(ServerAddress serverAddress, PipeWriter writer)
 967168    {
 967169        _serverAddress = serverAddress;
 967170        _writer = writer;
 967171    }
 172
 173    private protected virtual void Dispose(bool disposing)
 1144174    {
 1144175        if (_state.TrySetFlag(State.Disposed))
 965176        {
 177            // _reader can be null if connection establishment failed or didn't run.
 965178            if (_reader is not null)
 909179            {
 909180                if (_state.HasFlag(State.Reading))
 0181                {
 0182                    _reader.CancelPendingRead();
 0183                }
 184                else
 909185                {
 909186                    _reader.Complete(new IceRpcException(IceRpcError.ConnectionAborted));
 909187                }
 909188            }
 189
 965190            if (_state.HasFlag(State.Writing))
 0191            {
 0192                _writer.CancelPendingFlush();
 0193            }
 194            else
 965195            {
 965196                _writer.Complete(new IceRpcException(IceRpcError.ConnectionAborted));
 965197            }
 965198        }
 1144199    }
 200
 201    private protected TransportConnectionInformation FinishConnect()
 884202    {
 884203        Debug.Assert(_reader is not null);
 204
 884205        if (_state.HasFlag(State.Disposed))
 0206        {
 0207            _reader.Complete();
 0208            throw new ObjectDisposedException($"{typeof(ColocConnection)}");
 209        }
 210
 884211        var colocEndPoint = new ColocEndPoint(_serverAddress);
 884212        return new TransportConnectionInformation(colocEndPoint, colocEndPoint, null);
 884213    }
 214
 215    private protected enum State : int
 216    {
 217        Disposed = 1,
 218        Reading = 2,
 219        ShuttingDown = 4,
 220        Writing = 8,
 221    }
 222}
 223
 224/// <summary>The colocated client connection class.</summary>
 225internal class ClientColocConnection : ColocConnection
 226{
 227    private readonly Func<PipeReader, CancellationToken, Task<PipeReader>> _connectAsync;
 228    private PipeReader? _localPipeReader;
 229
 230    public override async Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken)
 231    {
 232        ObjectDisposedException.ThrowIf(_state.HasFlag(State.Disposed), this);
 233
 234        if (_reader is not null)
 235        {
 236            throw new InvalidOperationException("Connection establishment cannot be called twice.");
 237        }
 238
 239        Debug.Assert(!_state.HasFlag(State.ShuttingDown));
 240
 241        if (_localPipeReader is not null)
 242        {
 243            _reader = await _connectAsync(_localPipeReader, cancellationToken).ConfigureAwait(false);
 244            _localPipeReader = null; // The server-side connection is now responsible for completing the pipe reader.
 245        }
 246        return FinishConnect();
 247    }
 248
 249    internal ClientColocConnection(
 250        ServerAddress serverAddress,
 251        Pipe localPipe,
 252        Func<PipeReader, CancellationToken, Task<PipeReader>> connectAsync)
 253        : base(serverAddress, localPipe.Writer)
 254    {
 255        _connectAsync = connectAsync;
 256        _localPipeReader = localPipe.Reader;
 257    }
 258
 259    private protected override void Dispose(bool disposing)
 260    {
 261        base.Dispose(disposing);
 262        _localPipeReader?.Complete();
 263    }
 264}
 265
 266/// <summary>The colocated server connection class.</summary>
 267internal class ServerColocConnection : ColocConnection
 268{
 269    public override Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken) =>
 270        Task.FromResult(FinishConnect());
 271
 272    public ServerColocConnection(ServerAddress serverAddress, PipeWriter writer, PipeReader reader)
 273       : base(serverAddress, writer) => _reader = reader;
 274}