Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Managed the invocation of remote multi-parameter actor methods withou… #1304

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 36 additions & 10 deletions src/Dapr.Actors/Runtime/ActorManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ namespace Dapr.Actors.Runtime
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Text.Json.Nodes;
using System.Threading;
using System.Threading.Tasks;
using Dapr.Actors;
Expand All @@ -26,7 +28,7 @@ namespace Dapr.Actors.Runtime
using Microsoft.Extensions.Logging;

// The ActorManager serves as a cache for a variety of different concerns related to an Actor type
// as well as the runtime managment for Actor instances of that type.
// as well as the runtime management for Actor instances of that type.
internal sealed class ActorManager
{
private const string ReceiveReminderMethodName = "ReceiveReminderAsync";
Expand Down Expand Up @@ -55,7 +57,7 @@ internal sealed class ActorManager

internal ActorManager(
ActorRegistration registration,
ActorActivator activator,
ActorActivator activator,
JsonSerializerOptions jsonSerializerOptions,
bool useJsonSerialization,
ILoggerFactory loggerFactory,
Expand Down Expand Up @@ -161,8 +163,32 @@ async Task<object> RequestFunc(Actor actor, CancellationToken ct)
}
else
{
var errorMsg = $"Method {string.Concat(methodInfo.DeclaringType.Name, ".", methodInfo.Name)} has more than one parameter and can't be invoked through http";
throw new ArgumentException(errorMsg);
#if NET8_0_OR_GREATER
var jsonNodes = await JsonNode.ParseAsync(requestBodyStream);
#else
using var streamReader = new StreamReader(requestBodyStream);
var jsonString = await streamReader.ReadToEndAsync();
var jsonNodes = JsonNode.Parse(jsonString);
#endif
var parametersList = parameters.Select(parameter =>
{
// If the parameter is of type CancellationToken, return the token.
if (parameter.ParameterType == typeof(CancellationToken))
{
return ct;
}

// Deserialize the parameter from the JSON node.
// The parameter name is used to look up the JSON node and must match the JSON property name.
var parameterName = parameter.Name;
var parameterType = parameter.ParameterType;
var jsonNode = jsonNodes[parameterName];
var deserializedType = JsonSerializer.Deserialize(jsonNode, parameterType, jsonSerializerOptions);

return deserializedType;
});

awaitable = methodInfo.Invoke(actor, parametersList.ToArray());
}

await awaitable;
Expand Down Expand Up @@ -190,7 +216,7 @@ async Task<object> RequestFunc(Actor actor, CancellationToken ct)
var resultType = methodInfo.ReturnType.GenericTypeArguments[0];
await JsonSerializer.SerializeAsync(responseBodyStream, result, resultType, jsonSerializerOptions);
#else
await JsonSerializer.SerializeAsync<object>(responseBodyStream, result, jsonSerializerOptions);
await JsonSerializer.SerializeAsync<object>(responseBodyStream, result, jsonSerializerOptions);
#endif

}
Expand Down Expand Up @@ -222,9 +248,9 @@ async Task<byte[]> RequestFunc(Actor actor, CancellationToken ct)

internal async Task FireTimerAsync(ActorId actorId, Stream requestBodyStream, CancellationToken cancellationToken = default)
{
#pragma warning disable 0618
#pragma warning disable 0618
var timerData = await JsonSerializer.DeserializeAsync<TimerInfo>(requestBodyStream);
#pragma warning restore 0618
#pragma warning restore 0618

// Create a Func to be invoked by common method.
async Task<byte[]> RequestFunc(Actor actor, CancellationToken ct)
Expand Down Expand Up @@ -286,7 +312,7 @@ private async Task<ActorActivatorState> CreateActorAsync(ActorId actorId)
{
StateProvider = new DaprStateProvider(this.daprInteractor, this.jsonSerializerOptions),
};
var state = await this.activator.CreateAsync(host);
var state = await this.activator.CreateAsync(host);
this.logger.LogDebug("Finished creating Actor of type {ActorType} with ActorId {ActorId}", this.ActorTypeInfo.ImplementationType, actorId);
return state;
}
Expand Down Expand Up @@ -325,7 +351,7 @@ public bool TryGetActorAsync(ActorId id, out Actor actor)
var found = this.activeActors.TryGetValue(id, out var state);
actor = found ? state.Actor : default;
return found;
}
}

private static ActorMessageSerializersManager IntializeSerializationManager(
IActorMessageBodySerializationProvider serializationProvider)
Expand All @@ -344,7 +370,7 @@ private async Task<T> DispatchInternalAsync<T>(ActorId actorId, ActorMethodConte
}

if (!this.activeActors.TryGetValue(actorId, out var state))
{
{
var errorMsg = $"Actor {actorId} is not yet activated.";
throw new InvalidOperationException(errorMsg);
}
Expand Down
65 changes: 50 additions & 15 deletions test/Dapr.Actors.Test/Runtime/ActorRuntimeTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// ------------------------------------------------------------------------
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -15,19 +15,18 @@ namespace Dapr.Actors.Test
{
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Dapr.Actors;
using Dapr.Actors.Client;
using Dapr.Actors.Runtime;
using Microsoft.Extensions.Logging;
using Xunit;
using Dapr.Actors.Client;
using System.Reflection;
using System.Threading;

public sealed class ActorRuntimeTests
{
Expand All @@ -46,7 +45,7 @@ private interface ITestActor : IActor
public void TestInferredActorType()
{
var actorType = typeof(TestActor);

var options = new ActorRuntimeOptions();
options.Actors.RegisterActor<TestActor>();
var runtime = new ActorRuntime(options, loggerFactory, activatorFactory, proxyFactory);
Expand Down Expand Up @@ -119,6 +118,10 @@ public interface INotRemotedActor : IActor
Task<string> SingleArgumentAsync(bool arg);

Task<string> SingleArgumentWithCancellationAsync(bool arg, CancellationToken cancellationToken = default);

Task<string> MultiArgumentsAsync(bool arg1, string arg2);

Task<string> MultiArgumentsWithCancellationAsync(bool arg1, string arg2, CancellationToken cancellationToken = default);
}

public sealed class NotRemotedActor : Actor, INotRemotedActor
Expand Down Expand Up @@ -147,6 +150,16 @@ public Task<string> SingleArgumentWithCancellationAsync(bool arg, CancellationTo
{
return Task.FromResult(nameof(SingleArgumentWithCancellationAsync));
}

public Task<string> MultiArgumentsAsync(bool arg1, string arg2)
{
return Task.FromResult(nameof(MultiArgumentsAsync));
}

public Task<string> MultiArgumentsWithCancellationAsync(bool arg1, string arg2, CancellationToken cancellationToken = default)
{
return Task.FromResult(nameof(MultiArgumentsWithCancellationAsync));
}
}

public async Task<string> InvokeMethod<T>(string methodName, object arg = null) where T : Actor
Expand All @@ -167,7 +180,7 @@ public async Task<string> InvokeMethod<T>(string methodName, object arg = null)
}

using var output = new MemoryStream();

await runtime.DispatchWithoutRemotingAsync(typeof(T).Name, ActorId.CreateRandom().ToString(), methodName, input, output);

output.Seek(0, SeekOrigin.Begin);
Expand All @@ -179,17 +192,17 @@ public async Task<string> InvokeMethod<T>(string methodName, object arg = null)
public async Task NoRemotingMethodWithNoArguments()
{
string methodName = nameof(INotRemotedActor.NoArgumentsAsync);

string result = await InvokeMethod<NotRemotedActor>(methodName);

Assert.Equal(methodName, result);
Assert.Equal(methodName, result);
}

[Fact]
public async Task NoRemotingMethodWithNoArgumentsWithCancellation()
{
string methodName = nameof(INotRemotedActor.NoArgumentsWithCancellationAsync);

string result = await InvokeMethod<NotRemotedActor>(methodName);

Assert.Equal(methodName, result);
Expand All @@ -199,7 +212,7 @@ public async Task NoRemotingMethodWithNoArgumentsWithCancellation()
public async Task NoRemotingMethodWithSingleArgument()
{
string methodName = nameof(INotRemotedActor.SingleArgumentAsync);

string result = await InvokeMethod<NotRemotedActor>(methodName, true);

Assert.Equal(methodName, result);
Expand All @@ -209,12 +222,34 @@ public async Task NoRemotingMethodWithSingleArgument()
public async Task NoRemotingMethodWithSingleArgumentWithCancellation()
{
string methodName = nameof(INotRemotedActor.SingleArgumentWithCancellationAsync);

string result = await InvokeMethod<NotRemotedActor>(methodName, true);

Assert.Equal(methodName, result);
}

[Fact]
public async Task NoRemotingMethodWithMultiArguments()
{
var arg = new { arg1 = true, arg2 = "abc" };
string methodName = nameof(INotRemotedActor.MultiArgumentsAsync);

string result = await InvokeMethod<NotRemotedActor>(methodName, arg);

Assert.Equal(methodName, result);
}

[Fact]
public async Task NoRemotingMethodWithMultiArgumentsWithCancellation()
{
var arg = new { arg1 = true, arg2 = "abc" };
string methodName = nameof(INotRemotedActor.MultiArgumentsWithCancellationAsync);

string result = await InvokeMethod<NotRemotedActor>(methodName, arg);

Assert.Equal(methodName, result);
}

[Fact]
public async Task Actor_UsesCustomActivator()
{
Expand Down Expand Up @@ -330,7 +365,7 @@ public async Task TestActorSettingsWithRemindersStoragePartitions()
}

[Fact]
public async Task TestActorSettingsWithReentrancy()
public async Task TestActorSettingsWithReentrancy()
{
var actorType = typeof(TestActor);

Expand Down Expand Up @@ -465,7 +500,7 @@ private class TestActivator : DefaultActorActivator

public override Task<ActorActivatorState> CreateAsync(ActorHost host)
{
CreateCallCount++;;
CreateCallCount++; ;
return base.CreateAsync(host);
}

Expand Down
Loading