Skip to content

Commit

Permalink
Merge branch 'master' into actor_ttl
Browse files Browse the repository at this point in the history
Signed-off-by: artur-ciocanu <[email protected]>
  • Loading branch information
artur-ciocanu authored Mar 2, 2025
2 parents 4ff319c + efce229 commit 15fe5b3
Show file tree
Hide file tree
Showing 9 changed files with 331 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package io.dapr.spring.boot.autoconfigure.client;

import io.dapr.actors.client.ActorClient;
import io.dapr.actors.runtime.ActorRuntime;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.config.Properties;
Expand Down Expand Up @@ -70,6 +72,20 @@ DaprWorkflowClient daprWorkflowClient(DaprConnectionDetails daprConnectionDetail
return new DaprWorkflowClient(properties);
}

@Bean
@ConditionalOnMissingBean
ActorClient daprActorClient(DaprConnectionDetails daprConnectionDetails) {
Properties properties = createPropertiesFromConnectionDetails(daprConnectionDetails);
return new ActorClient(properties);
}

@Bean
@ConditionalOnMissingBean
ActorRuntime daprActorRuntime(DaprConnectionDetails daprConnectionDetails) {
Properties properties = createPropertiesFromConnectionDetails(daprConnectionDetails);
return ActorRuntime.getInstance(properties);
}

@Bean
@ConditionalOnMissingBean
WorkflowRuntimeBuilder daprWorkflowRuntimeBuilder(DaprConnectionDetails daprConnectionDetails) {
Expand Down
22 changes: 2 additions & 20 deletions sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.dapr.client.resiliency.ResiliencyOptions;
import io.dapr.config.Properties;
import io.dapr.utils.NetworkUtils;
import io.dapr.utils.Version;
import io.dapr.v1.DaprGrpc;
import io.grpc.Channel;
Expand Down Expand Up @@ -83,7 +84,7 @@ public ActorClient(Properties overrideProperties, ResiliencyOptions resiliencyOp
* @param resiliencyOptions Client resiliency options.
*/
public ActorClient(Properties overrideProperties, Map<String, String> metadata, ResiliencyOptions resiliencyOptions) {
this(buildManagedChannel(overrideProperties),
this(NetworkUtils.buildGrpcManagedChannel(overrideProperties),
metadata,
resiliencyOptions,
overrideProperties.getValue(Properties.API_TOKEN));
Expand Down Expand Up @@ -129,25 +130,6 @@ public void close() {
}
}

/**
* Creates a GRPC managed channel (or null, if not applicable).
*
* @param overrideProperties Overrides
* @return GRPC managed channel or null.
*/
private static ManagedChannel buildManagedChannel(Properties overrideProperties) {
int port = overrideProperties.getValue(Properties.GRPC_PORT);
if (port <= 0) {
throw new IllegalArgumentException("Invalid port.");
}

var sidecarHost = overrideProperties.getValue(Properties.SIDECAR_IP);

return ManagedChannelBuilder.forAddress(sidecarHost, port)
.usePlaintext()
.userAgent(Version.getSdkVersion())
.build();
}

/**
* Build an instance of the Client based on the provided setup.
Expand Down
103 changes: 55 additions & 48 deletions sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
import io.dapr.config.Properties;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.utils.Version;
import io.dapr.utils.NetworkUtils;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import reactor.core.publisher.Mono;

import java.io.Closeable;
Expand Down Expand Up @@ -80,23 +79,32 @@ public class ActorRuntime implements Closeable {
* @throws IllegalStateException If cannot instantiate Runtime.
*/
private ActorRuntime() throws IllegalStateException {
this(buildManagedChannel());
this(new Properties());
}

/**
* The default constructor. This should not be called directly.
*
* @throws IllegalStateException If cannot instantiate Runtime.
*/
private ActorRuntime(Properties properties) throws IllegalStateException {
this(NetworkUtils.buildGrpcManagedChannel(properties));
}

/**
* Constructor once channel is available. This should not be called directly.
*
* @param channel GRPC managed channel to be closed (or null).
* @throws IllegalStateException If cannot instantiate Runtime.
* @throws IllegalStateException If you cannot instantiate Runtime.
*/
private ActorRuntime(ManagedChannel channel) throws IllegalStateException {
this(channel, buildDaprClient(channel));
this(channel, new DaprClientImpl(channel));
}

/**
* Constructor with dependency injection, useful for testing. This should not be called directly.
*
* @param channel GRPC managed channel to be closed (or null).
* @param channel GRPC managed channel to be closed (or null).
* @param daprClient Client to communicate with Dapr.
* @throws IllegalStateException If class has one instance already.
*/
Expand Down Expand Up @@ -128,6 +136,24 @@ public static ActorRuntime getInstance() {
return instance;
}

/**
* Returns an ActorRuntime object.
*
* @param properties Properties to be used for the runtime.
* @return An ActorRuntime object.
*/
public static ActorRuntime getInstance(Properties properties) {
if (instance == null) {
synchronized (ActorRuntime.class) {
if (instance == null) {
instance = new ActorRuntime(properties);
}
}
}

return instance;
}

/**
* Gets the Actor configuration for this runtime.
*
Expand All @@ -149,24 +175,22 @@ public byte[] serializeConfig() throws IOException {

/**
* Registers an actor with the runtime, using {@link DefaultObjectSerializer} and {@link DefaultActorFactory}.
*
* {@link DefaultObjectSerializer} is not recommended for production scenarios.
*
* @param clazz The type of actor.
* @param <T> Actor class type.
* @param clazz The type of actor.
* @param <T> Actor class type.
*/
public <T extends AbstractActor> void registerActor(Class<T> clazz) {
registerActor(clazz, new DefaultObjectSerializer(), new DefaultObjectSerializer());
}

/**
* Registers an actor with the runtime, using {@link DefaultObjectSerializer}.
*
* {@link DefaultObjectSerializer} is not recommended for production scenarios.
*
* @param clazz The type of actor.
* @param actorFactory An optional factory to create actors. This can be used for dependency injection.
* @param <T> Actor class type.
* @param clazz The type of actor.
* @param actorFactory An optional factory to create actors. This can be used for dependency injection.
* @param <T> Actor class type.
*/
public <T extends AbstractActor> void registerActor(Class<T> clazz, ActorFactory<T> actorFactory) {
registerActor(clazz, actorFactory, new DefaultObjectSerializer(), new DefaultObjectSerializer());
Expand All @@ -181,8 +205,8 @@ public <T extends AbstractActor> void registerActor(Class<T> clazz, ActorFactory
* @param <T> Actor class type.
*/
public <T extends AbstractActor> void registerActor(
Class<T> clazz, DaprObjectSerializer objectSerializer, DaprObjectSerializer stateSerializer) {
registerActor(clazz, new DefaultActorFactory<T>(), objectSerializer, stateSerializer);
Class<T> clazz, DaprObjectSerializer objectSerializer, DaprObjectSerializer stateSerializer) {
registerActor(clazz, new DefaultActorFactory<T>(), objectSerializer, stateSerializer);
}

/**
Expand All @@ -195,9 +219,9 @@ public <T extends AbstractActor> void registerActor(
* @param <T> Actor class type.
*/
public <T extends AbstractActor> void registerActor(
Class<T> clazz, ActorFactory<T> actorFactory,
DaprObjectSerializer objectSerializer,
DaprObjectSerializer stateSerializer) {
Class<T> clazz, ActorFactory<T> actorFactory,
DaprObjectSerializer objectSerializer,
DaprObjectSerializer stateSerializer) {
if (clazz == null) {
throw new IllegalArgumentException("Class is required.");
}
Expand All @@ -216,12 +240,12 @@ public <T extends AbstractActor> void registerActor(
// Create ActorManager, if not yet registered.
this.actorManagers.computeIfAbsent(actorTypeInfo.getName(), (k) -> {
ActorRuntimeContext<T> context = new ActorRuntimeContext<>(
this,
objectSerializer,
actorFactory,
actorTypeInfo,
this.daprClient,
new DaprStateAsyncProvider(this.daprClient, stateSerializer));
this,
objectSerializer,
actorFactory,
actorTypeInfo,
this.daprClient,
new DaprStateAsyncProvider(this.daprClient, stateSerializer));
this.config.addRegisteredActorType(actorTypeInfo.getName());
return new ActorManager<T>(context);
});
Expand All @@ -236,7 +260,7 @@ public <T extends AbstractActor> void registerActor(
*/
public Mono<Void> deactivate(String actorTypeName, String actorId) {
return Mono.fromSupplier(() -> this.getActorManager(actorTypeName))
.flatMap(m -> m.deactivateActor(new ActorId(actorId)));
.flatMap(m -> m.deactivateActor(new ActorId(actorId)));
}

/**
Expand All @@ -252,8 +276,8 @@ public Mono<Void> deactivate(String actorTypeName, String actorId) {
public Mono<byte[]> invoke(String actorTypeName, String actorId, String actorMethodName, byte[] payload) {
ActorId id = new ActorId(actorId);
return Mono.fromSupplier(() -> this.getActorManager(actorTypeName))
.flatMap(m -> m.activateActor(id).thenReturn(m))
.flatMap(m -> ((ActorManager)m).invokeMethod(id, actorMethodName, payload));
.flatMap(m -> m.activateActor(id).thenReturn(m))
.flatMap(m -> ((ActorManager) m).invokeMethod(id, actorMethodName, payload));
}

/**
Expand All @@ -268,8 +292,8 @@ public Mono<byte[]> invoke(String actorTypeName, String actorId, String actorMet
public Mono<Void> invokeReminder(String actorTypeName, String actorId, String reminderName, byte[] params) {
ActorId id = new ActorId(actorId);
return Mono.fromSupplier(() -> this.getActorManager(actorTypeName))
.flatMap(m -> m.activateActor(id).thenReturn(m))
.flatMap(m -> ((ActorManager)m).invokeReminder(new ActorId(actorId), reminderName, params));
.flatMap(m -> m.activateActor(id).thenReturn(m))
.flatMap(m -> ((ActorManager) m).invokeReminder(new ActorId(actorId), reminderName, params));
}

/**
Expand All @@ -284,8 +308,8 @@ public Mono<Void> invokeReminder(String actorTypeName, String actorId, String re
public Mono<Void> invokeTimer(String actorTypeName, String actorId, String timerName, byte[] params) {
ActorId id = new ActorId(actorId);
return Mono.fromSupplier(() -> this.getActorManager(actorTypeName))
.flatMap(m -> m.activateActor(id).thenReturn(m))
.flatMap(m -> ((ActorManager)m).invokeTimer(new ActorId(actorId), timerName, params));
.flatMap(m -> m.activateActor(id).thenReturn(m))
.flatMap(m -> ((ActorManager) m).invokeTimer(new ActorId(actorId), timerName, params));
}

/**
Expand Down Expand Up @@ -318,23 +342,6 @@ private static DaprClient buildDaprClient(ManagedChannel channel) {
return new DaprClientImpl(channel);
}

/**
* Creates a GRPC managed channel (or null, if not applicable).
*
* @return GRPC managed channel or null.
*/
private static ManagedChannel buildManagedChannel() {
int port = Properties.GRPC_PORT.get();
if (port <= 0) {
throw new IllegalStateException("Invalid port.");
}

return ManagedChannelBuilder.forAddress(Properties.SIDECAR_IP.get(), port)
.usePlaintext()
.userAgent(Version.getSdkVersion())
.build();
}

/**
* {@inheritDoc}
*/
Expand Down
107 changes: 107 additions & 0 deletions sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprActorsIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.it.testcontainers;

import io.dapr.actors.ActorId;
import io.dapr.actors.client.ActorClient;
import io.dapr.actors.client.ActorProxyBuilder;
import io.dapr.actors.runtime.ActorRuntime;
import io.dapr.testcontainers.Component;
import io.dapr.testcontainers.DaprContainer;
import io.dapr.testcontainers.DaprLogLevel;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

import java.util.Map;
import java.util.Random;
import java.util.UUID;

import static org.junit.jupiter.api.Assertions.assertEquals;

@SpringBootTest(
webEnvironment = WebEnvironment.RANDOM_PORT,
classes = {
TestActorsApplication.class,
TestDaprActorsConfiguration.class
}
)
@Testcontainers
@Tag("testcontainers")
public class DaprActorsIT {
private static final Network DAPR_NETWORK = Network.newNetwork();
private static final Random RANDOM = new Random();
private static final int PORT = RANDOM.nextInt(1000) + 8000;

private static final String ACTORS_MESSAGE_PATTERN = ".*Actor API level in the cluster has been updated to 10.*";

@Container
private static final DaprContainer DAPR_CONTAINER = new DaprContainer("daprio/daprd:1.14.4")
.withAppName("actor-dapr-app")
.withNetwork(DAPR_NETWORK)
.withComponent(new Component("kvstore", "state.in-memory", "v1",
Map.of("actorStateStore", "true")))
.withDaprLogLevel(DaprLogLevel.DEBUG)
.withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String()))
.withAppChannelAddress("host.testcontainers.internal")
.withAppPort(PORT);

/**
* Expose the Dapr ports to the host.
*
* @param registry the dynamic property registry
*/
@DynamicPropertySource
static void daprProperties(DynamicPropertyRegistry registry) {
registry.add("dapr.http.endpoint", DAPR_CONTAINER::getHttpEndpoint);
registry.add("dapr.grpc.endpoint", DAPR_CONTAINER::getGrpcEndpoint);
registry.add("server.port", () -> PORT);
}

@Autowired
private ActorClient daprActorClient;

@Autowired
private ActorRuntime daprActorRuntime;

@BeforeEach
public void setUp(){
org.testcontainers.Testcontainers.exposeHostPorts(PORT);
daprActorRuntime.registerActor(TestActorImpl.class);
// Ensure the subscriptions are registered
Wait.forLogMessage(ACTORS_MESSAGE_PATTERN, 1).waitUntilReady(DAPR_CONTAINER);
}

@Test
public void testActors() {
ActorProxyBuilder<TestActor> builder = new ActorProxyBuilder<>(TestActor.class, daprActorClient);
ActorId actorId = ActorId.createRandom();
TestActor actor = builder.build(actorId);

String message = UUID.randomUUID().toString();

String echoedMessage = actor.echo(message);

assertEquals(echoedMessage, message);
}
}
Loading

0 comments on commit 15fe5b3

Please sign in to comment.