< Summary

Information
Class: IceRpc.Protobuf.RpcMethods.InvokerExtensions
Assembly: IceRpc.Protobuf
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Protobuf/RpcMethods/InvokerExtensions.cs
Tag: 1856_27024993493
Line coverage
86%
Covered lines: 104
Uncovered lines: 16
Coverable lines: 120
Total lines: 283
Line coverage: 86.6%
Branch coverage
65%
Covered branches: 21
Total branches: 32
Branch coverage: 65.6%
Method coverage
100%
Covered methods: 7
Fully covered methods: 3
Total methods: 7
Method coverage: 100%
Full method coverage: 42.8%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.cctor()100%11100%
InvokeUnaryAsync(...)62.5%9878.94%
InvokeClientStreamingAsync(...)50%4477.77%
InvokeServerStreamingAsync(...)50%9878.94%
InvokeBidiStreamingAsync(...)50%4477.77%
ReceiveResponseAsync()100%44100%
ReceiveStreamingResponseAsync()100%44100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Protobuf/RpcMethods/InvokerExtensions.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using Google.Protobuf;
 4using IceRpc.Features;
 5using IceRpc.Protobuf.RpcMethods.Internal;
 6using System.Collections.Immutable;
 7using System.IO.Pipelines;
 8
 9namespace IceRpc.Protobuf.RpcMethods;
 10
 11/// <summary>Provides extension methods for <see cref="IInvoker" />.</summary>
 12public static class InvokerExtensions
 13{
 114    private static readonly IDictionary<RequestFieldKey, OutgoingFieldValue> _idempotentFields =
 115        new Dictionary<RequestFieldKey, OutgoingFieldValue>
 116        {
 117            [RequestFieldKey.Idempotent] = default
 118        }.ToImmutableDictionary();
 19
 20    /// <summary>Sends a request to a service and decodes the response. This method is for Protobuf unary RPCs.
 21    /// </summary>
 22    /// <typeparam name="TOutput">The type of the output message.</typeparam>
 23    /// <param name="invoker">The invoker used to send the request.</param>
 24    /// <param name="serviceAddress">The address of the target service.</param>
 25    /// <param name="operation">The name of the operation, as specified in Protobuf.</param>
 26    /// <param name="inputMessage">The input message to encode in the request payload.</param>
 27    /// <param name="messageParser">The <see cref="MessageParser{T}"/> used to decode the response payload.</param>
 28    /// <param name="encodeOptions">The options to customize the encoding of the request payload.</param>
 29    /// <param name="features">The invocation features.</param>
 30    /// <param name="idempotent">When <see langword="true" />, the request is idempotent.</param>
 31    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 32    /// <returns>The operation's return value.</returns>
 33    public static Task<TOutput> InvokeUnaryAsync<TOutput>(
 34        this IInvoker invoker,
 35        ServiceAddress serviceAddress,
 36        string operation,
 37        IMessage inputMessage,
 38        MessageParser<TOutput> messageParser,
 39        ProtobufEncodeOptions? encodeOptions = null,
 40        IFeatureCollection? features = null,
 41        bool idempotent = false,
 42        CancellationToken cancellationToken = default) where TOutput : class, IMessage<TOutput>
 1543    {
 1544        var request = new OutgoingRequest(serviceAddress)
 1545        {
 1546            Features = features ?? FeatureCollection.Empty,
 1547            Fields = idempotent ?
 1548                _idempotentFields : ImmutableDictionary<RequestFieldKey, OutgoingFieldValue>.Empty,
 1549            Operation = operation,
 1550            Payload = inputMessage.EncodeAsLengthPrefixedMessage(
 1551                encodeOptions?.PipeOptions ?? ProtobufEncodeOptions.Default.PipeOptions),
 1552        };
 53
 54        Task<IncomingResponse> responseTask;
 55        try
 1556        {
 1557            responseTask = invoker.InvokeAsync(request, cancellationToken);
 1558        }
 059        catch
 060        {
 061            request.Dispose();
 062            throw;
 63        }
 64
 65        // ReceiveResponseAsync is responsible for disposing the request
 1566        return ReceiveResponseAsync(messageParser, responseTask, request, cancellationToken);
 1567    }
 68
 69    /// <summary>Sends a request to a service and decodes the response. This method is for Protobuf client-streaming
 70    /// RPCs.</summary>
 71    /// <typeparam name="TInput">The type of the input message.</typeparam>
 72    /// <typeparam name="TOutput">The type of the output message.</typeparam>
 73    /// <param name="invoker">The invoker used to send the request.</param>
 74    /// <param name="serviceAddress">The address of the target service.</param>
 75    /// <param name="operation">The name of the operation, as specified in Protobuf.</param>
 76    /// <param name="stream">The stream of input message to encode in the request payload continuation.</param>
 77    /// <param name="messageParser">The <see cref="MessageParser{T}"/> used to decode the response payload.</param>
 78    /// <param name="encodeOptions">The options to customize the encoding of the request payload continuation.</param>
 79    /// <param name="features">The invocation features.</param>
 80    /// <param name="idempotent">When <see langword="true" />, the request is idempotent.</param>
 81    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 82    /// <returns>The operation's return value.</returns>
 83    public static Task<TOutput> InvokeClientStreamingAsync<TInput, TOutput>(
 84        this IInvoker invoker,
 85        ServiceAddress serviceAddress,
 86        string operation,
 87        IAsyncEnumerable<TInput> stream,
 88        MessageParser<TOutput> messageParser,
 89        ProtobufEncodeOptions? encodeOptions = null,
 90        IFeatureCollection? features = null,
 91        bool idempotent = false,
 92        CancellationToken cancellationToken = default) where TInput : class, IMessage<TInput>
 93                                                       where TOutput : class, IMessage<TOutput>
 494    {
 495        var request = new OutgoingRequest(serviceAddress)
 496        {
 497            Features = features ?? FeatureCollection.Empty,
 498            Fields = idempotent ?
 499                _idempotentFields : ImmutableDictionary<RequestFieldKey, OutgoingFieldValue>.Empty,
 4100            Operation = operation,
 4101            PayloadContinuation = stream.ToPipeReader(encodeOptions),
 4102        };
 103
 104        Task<IncomingResponse> responseTask;
 105        try
 4106        {
 4107            responseTask = invoker.InvokeAsync(request, cancellationToken);
 4108        }
 0109        catch
 0110        {
 0111            request.Dispose();
 0112            throw;
 113        }
 114
 115        // ReceiveResponseAsync is responsible for disposing the request
 4116        return ReceiveResponseAsync(messageParser, responseTask, request, cancellationToken);
 4117    }
 118
 119    /// <summary>Sends a request to a service and decodes the response. This method is for Protobuf server-streaming
 120    /// RPCs.</summary>
 121    /// <typeparam name="TOutput">The type of the output message.</typeparam>
 122    /// <param name="invoker">The invoker used to send the request.</param>
 123    /// <param name="serviceAddress">The address of the target service.</param>
 124    /// <param name="operation">The name of the operation, as specified in Protobuf.</param>
 125    /// <param name="inputMessage">The input message to encode in the request payload.</param>
 126    /// <param name="messageParser">The <see cref="MessageParser{T}"/> used to decode the response payload.</param>
 127    /// <param name="encodeOptions">The options to customize the encoding of the request payload.</param>
 128    /// <param name="features">The invocation features.</param>
 129    /// <param name="idempotent">When <see langword="true" />, the request is idempotent.</param>
 130    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 131    /// <returns>The operation's return value.</returns>
 132    public static Task<IAsyncStream<TOutput>> InvokeServerStreamingAsync<TOutput>(
 133        this IInvoker invoker,
 134        ServiceAddress serviceAddress,
 135        string operation,
 136        IMessage inputMessage,
 137        MessageParser<TOutput> messageParser,
 138        ProtobufEncodeOptions? encodeOptions = null,
 139        IFeatureCollection? features = null,
 140        bool idempotent = false,
 141        CancellationToken cancellationToken = default) where TOutput : class, IMessage<TOutput>
 5142    {
 5143        var request = new OutgoingRequest(serviceAddress)
 5144        {
 5145            Features = features ?? FeatureCollection.Empty,
 5146            Fields = idempotent ?
 5147                _idempotentFields : ImmutableDictionary<RequestFieldKey, OutgoingFieldValue>.Empty,
 5148            Operation = operation,
 5149            Payload = inputMessage.EncodeAsLengthPrefixedMessage(
 5150                encodeOptions?.PipeOptions ?? ProtobufEncodeOptions.Default.PipeOptions),
 5151        };
 152
 153        Task<IncomingResponse> responseTask;
 154        try
 5155        {
 5156            responseTask = invoker.InvokeAsync(request, cancellationToken);
 5157        }
 0158        catch
 0159        {
 0160            request.Dispose();
 0161            throw;
 162        }
 163
 164        // ReceiveStreamingResponseAsync is responsible for disposing the request
 5165        return ReceiveStreamingResponseAsync(messageParser, responseTask, request);
 5166    }
 167
 168    /// <summary>Sends a request to a service and decodes the response. This method is for Protobuf bidi-streaming
 169    /// RPCs.</summary>
 170    /// <typeparam name="TInput">The type of the input message.</typeparam>
 171    /// <typeparam name="TOutput">The type of the output message.</typeparam>
 172    /// <param name="invoker">The invoker used to send the request.</param>
 173    /// <param name="serviceAddress">The address of the target service.</param>
 174    /// <param name="operation">The name of the operation, as specified in Protobuf.</param>
 175    /// <param name="stream">The stream of input message to encode in the request payload continuation.</param>
 176    /// <param name="messageParser">The <see cref="MessageParser{T}"/> used to decode the response payload.</param>
 177    /// <param name="encodeOptions">The options to customize the encoding of the request payload continuation.</param>//
 178    /// <param name="features">The invocation features.</param>
 179    /// <param name="idempotent">When <see langword="true" />, the request is idempotent.</param>
 180    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 181    /// <returns>The operation's return value.</returns>
 182    public static Task<IAsyncStream<TOutput>> InvokeBidiStreamingAsync<TInput, TOutput>(
 183        this IInvoker invoker,
 184        ServiceAddress serviceAddress,
 185        string operation,
 186        IAsyncEnumerable<TInput> stream,
 187        MessageParser<TOutput> messageParser,
 188        ProtobufEncodeOptions? encodeOptions = null,
 189        IFeatureCollection? features = null,
 190        bool idempotent = false,
 191        CancellationToken cancellationToken = default) where TInput : class, IMessage<TInput>
 192                                                       where TOutput : class, IMessage<TOutput>
 4193    {
 4194        var request = new OutgoingRequest(serviceAddress)
 4195        {
 4196            Features = features ?? FeatureCollection.Empty,
 4197            Fields = idempotent ?
 4198                _idempotentFields : ImmutableDictionary<RequestFieldKey, OutgoingFieldValue>.Empty,
 4199            Operation = operation,
 4200            PayloadContinuation = stream.ToPipeReader(encodeOptions),
 4201        };
 202
 203        Task<IncomingResponse> responseTask;
 204        try
 4205        {
 4206            responseTask = invoker.InvokeAsync(request, cancellationToken);
 4207        }
 0208        catch
 0209        {
 0210            request.Dispose();
 0211            throw;
 212        }
 213
 214        // ReceiveStreamingResponseAsync is responsible for disposing the request
 4215        return ReceiveStreamingResponseAsync(messageParser, responseTask, request);
 4216    }
 217
 218    private static async Task<TOutput> ReceiveResponseAsync<TOutput>(
 219        MessageParser<TOutput> messageParser,
 220        Task<IncomingResponse> responseTask,
 221        OutgoingRequest request,
 222        CancellationToken cancellationToken) where TOutput : class, IMessage<TOutput>
 19223    {
 224        try
 19225        {
 19226            IncomingResponse response = await responseTask.ConfigureAwait(false);
 19227            if (response.StatusCode == StatusCode.Ok)
 16228            {
 16229                IProtobufFeature protobufFeature = request.Features.Get<IProtobufFeature>() ?? ProtobufFeature.Default;
 16230                return await response.Payload.DecodeProtobufMessageAsync(
 16231                    messageParser,
 16232                    protobufFeature.MaxMessageLength,
 16233                    // The payload of the response to a oneway request is empty and we decode it as a
 16234                    // default-constructed response message.
 16235                    acceptEmptyPayload: request.IsOneway,
 16236                    cancellationToken).ConfigureAwait(false);
 237            }
 238            else
 3239            {
 240                // IceRPC guarantees the error message is non-null when StatusCode > Ok.
 3241                throw new DispatchException(response.StatusCode, response.ErrorMessage!)
 3242                {
 3243                    ConvertToInternalError = true
 3244                };
 245            }
 246        }
 247        finally
 19248        {
 19249            request.Dispose();
 19250        }
 16251    }
 252
 253    private static async Task<IAsyncStream<TOutput>> ReceiveStreamingResponseAsync<TOutput>(
 254        MessageParser<TOutput> messageParser,
 255        Task<IncomingResponse> responseTask,
 256        OutgoingRequest request) where TOutput : class, IMessage<TOutput>
 9257    {
 258        try
 9259        {
 9260            IncomingResponse response = await responseTask.ConfigureAwait(false);
 9261            if (response.StatusCode == StatusCode.Ok)
 6262            {
 6263                IProtobufFeature protobufFeature = request.Features.Get<IProtobufFeature>() ?? ProtobufFeature.Default;
 6264                PipeReader payload = response.DetachPayload();
 6265                return payload.ToAsyncStream(
 6266                    messageParser,
 6267                    protobufFeature.MaxMessageLength);
 268            }
 269            else
 3270            {
 271                // IceRPC guarantees the error message is non-null when StatusCode > Ok.
 3272                throw new DispatchException(response.StatusCode, response.ErrorMessage!)
 3273                {
 3274                    ConvertToInternalError = true
 3275                };
 276            }
 277        }
 278        finally
 9279        {
 9280            request.Dispose();
 9281        }
 6282    }
 283}