Actors 为最低级别的“计算单元”
以上解释来自官方文档,看起来“晦涩难懂”。大白话就是说Actors模式是一段需要单线程执行的代码块。
实际开发中我们经常会有一些逻辑不能并发执行,我们常用的做法就是加锁,例如:
lock(obj) { //dosomething... }
或者用Redis等中间件,为分布式应用加一些分布式锁。遗憾的是,使用显式锁定机制容易出错。 它们很容易导致死锁,并可能对性能产生严重影响。Actors模式为单线程逻辑提供了一种更好的选择。
什么时候用Actors
Dapr启动app时,Sidecar调用Actors获取配置信息,之后Sidecar将Actors的信息发送到安置服务(Placement Service),安置服务会将不同的Actor类型根据其Id和Actor类型分区,并将Actor信息广播到所有dapr实例。
在客户端调用某个Actor时,安置服务会根据其Id和Actor类型,找到其所在的dapr实例,并执行其方法。
POST/GET/PUT/DELETE http://localhost:3500/v1.0/actors/<actorType>/<actorId>/method/<method>
<actorType>
:执行组件类型。<actorId>
:要调用的特定参与者的 ID。<method>
:要调用的方法Actor可以设置timer和reminder设置执行Actor的时间,有点像我们常用的定时任务。但是timer和reminder也存在不同。
POST/PUT http://localhost:3500/v1.0/actors/<actorType>/<actorId>/timers/<name>
到期时间(due time)表示注册后 timer 将首次触发的时间。 period
表示timer在此之后触发的频率。 到期时间为0表示立即执行。 负 due times 和负 periods 都是无效。
下面的请求体配置了一个 timer, dueTime
9秒, period
3秒。 这意味着它将在9秒后首次触发,然后每3秒触发一次。
{ "dueTime":"0h0m9s0ms", "period":"0h0m3s0ms" }
下面的请求体配置了一个 timer, dueTime
0秒, period
3秒。 这意味着它将在注册之后立即触发,然后每3秒触发一次。
{ "dueTime":"0h0m0s0ms", "period":"0h0m3s0ms" }
POST/PUT/GET/DELETE http://localhost:3500/v1.0/actors/<actorType>/<actorId>/reminders/<name>
到期时间(due time)表示注册后 reminders将首次触发的时间。 period
表示在此之后 reminders 将触发的频率。 到期时间为0表示立即执行。 负 due times 和负 periods 都是无效。 若要注册仅触发一次的 reminders ,请将 period 设置为空字符串。
下面的请求体配置了一个 reminders, dueTime
9秒, period
3秒。 这意味着它将在9秒后首次触发,然后每3秒触发一次。
{ "dueTime":"0h0m9s0ms", "period":"0h0m3s0ms" }
下面的请求体配置了一个 reminders, dueTime
0秒, period
3秒。 这意味着它将在注册之后立即触发,然后每3秒触发一次。
{ "dueTime":"0h0m0s0ms", "period":"0h0m3s0ms" }
下面的请求体配置了一个 reminders, dueTime
15秒, period
空字符串。 这意味着它将在15秒后首次触发,之后就不再被触发。
{ "dueTime":"0h0m15s0ms", "period":"" }
使用 Dapr 状态管理构建块保存执行组件状态。 由于执行组件可以一轮执行多个状态操作,因此状态存储组件必须支持多项事务。 撰写本文时,以下状态存储支持多项事务:
若要配置要与执行组件一起使用的状态存储组件,需要将以下元数据附加到状态存储配置:
- name: actorStateStore value: "true"
win10自承载模式下已默认设置此项 C:\Users\<username>\.dapr\components\statestore.yaml
下面将通过一个审核流程的例子来演示。
还是用前面的FrontEnd项目,引入nuget包Dapr.Actors和Dapr.Actors.AspNetCore。
定义IOrderStatusActor接口,需要继承自IActor
using Dapr.Actors; using System.Threading.Tasks; namespace FrontEnd.ActorDefine { public interface IOrderStatusActor : IActor { Task<string> Paid(string orderId); Task<string> GetStatus(string orderId); } }
定义OrderStatusActor实现IOrderStatusActor,并继承自Actor
using Dapr.Actors.Runtime; using System.Threading.Tasks; namespace FrontEnd.ActorDefine { public class OrderStatusActor : Actor, IOrderStatusActor { public OrderStatusActor(ActorHost host) : base(host) { } public async Task<string> Paid(string orderId) { // change order status to paid await StateManager.AddOrUpdateStateAsync(orderId, "init", (key, currentStatus) => "paid"); return orderId; } public async Task<string> GetStatus(string orderId) { return await StateManager.GetStateAsync<string>(orderId); } } }
需要注意的是,执行组件方法的返回类型必须为 Task 或 Task<T> 。 此外,执行组件方法最多只能有一个参数。 返回类型和参数都必须可 System.Text.Json 序列化。
Actor的api是必需的,因为 Dapr 挎斗调用应用程序来承载和与执行组件实例进行交互,所以在Startup的Configure中配置
app.UseEndpoints(endpoints => { endpoints.MapActorsHandlers(); // ....... });
Startup
类也是用于注册特定执行组件类型的位置。 在ConfigureServices
ScoreActor
使用注册 services.AddActors
:
services.AddActors(options => { options.Actors.RegisterActor<OrderStatusActor>(); });
为测试这个Actor,需要定义一个接口调用,新增ActorController
using Dapr.Actors; using Dapr.Actors.Client; using FrontEnd.ActorDefine; using Microsoft.AspNetCore.Mvc; using System.Threading.Tasks; namespace FrontEnd.Controllers { [Route("[controller]")] [ApiController] public class ActorController : ControllerBase { [HttpGet("paid/{orderId}")] public async Task<ActionResult> PaidAsync(string orderId) { var actorId = new ActorId(orderId); var proxy = ActorProxy.Create<IOrderStatusActor>(actorId, "OrderStatusActor"); var result = await proxy.Paid(orderId); return Ok(result); } } }
ActorProxy.Create
为创建代理实例。 Create
方法采用两个参数:标识特定执行组件和执行组件 ActorId
类型。 它还具有一个泛型类型参数,用于指定执行组件类型所实现的执行组件接口。 由于服务器和客户端应用程序都需要使用执行组件接口,它们通常存储在单独的共享项目中。
下面通过postman测试下,调用成功
查看redis中的数据
127.0.0.1:6379> keys * 1) "test_topic" 2) "frontend||guid" 3) "frontend||name" 5) "newOrder" 6) "frontend||OrderStatusActor||myid-123||123" 7) "myapp2||key2" 8) "myapp2||key1" 9) "deathStarStatus" 10) "myapp||name" 127.0.0.1:6379> hgetall frontend||OrderStatusActor||myid-123||123 1) "data" 2) "\"init\"" 3) "version" 4) "1"
可以发现actor数据的命名规则是appName||ActorName||ActorId||key
同样可以使用注入的方式创建proxy,ActorController中注入IActorProxyFactory
private readonly IActorProxyFactory _actorProxyFactory; public ActorController(IActorProxyFactory actorProxyFactory) { _actorProxyFactory = actorProxyFactory; }
新增获取数据接口
[HttpGet("get/{orderId}")] public async Task<ActionResult> GetAsync(string orderId) { var proxy = _actorProxyFactory.CreateActorProxy<IOrderStatusActor>( new ActorId("myid-" + orderId), "OrderStatusActor"); return Ok(await proxy.GetStatus(orderId)); }
postman测试
使用Actor基类的 RegisterTimerAsync
方法计划计时器。在OrderStatusActor类中新增方法
public Task StartTimerAsync(string name, string text) { return RegisterTimerAsync( name, nameof(TimerCallbackAsync), Encoding.UTF8.GetBytes(text), TimeSpan.Zero, TimeSpan.FromSeconds(3)); } public Task TimerCallbackAsync(byte[] state) { var text = Encoding.UTF8.GetString(state); _logger.LogInformation($"Timer fired: {text}"); return Task.CompletedTask; }
StartTimerAsync
方法调用 RegisterTimerAsync
来计划计时器。 RegisterTimerAsync
采用五个参数:
TimeSpan.FromMilliseconds(-1)
禁用定期信号。在OrderStatusActor构造方法中调用StartTimerAsync
StartTimerAsync("test-timer", "this is a test timer").ConfigureAwait(false).GetAwaiter().GetResult();
通过调用paid接口实例化一个Actor,即可开启timer
查看控制台,timer触发成功
== APP == info: FrontEnd.ActorDefine.OrderStatusActor[0] == APP == Timer fired: this is a test timer
TimerCallbackAsync
方法以二进制形式接收用户状态。 在示例中,回调在将状态写入日志之前将状态 string
解码回 。
可以通过调用 来停止计时器 UnregisterTimerAsync
:
public Task StopTimerAsync(string name) { return UnregisterTimerAsync(name); }
使用Actor基类的 RegisterReminderAsync 方法计划计时器。在OrderStatusActor类中新增方法
public Task SetReminderAsync(string text) { return RegisterReminderAsync( "test-reminder", Encoding.UTF8.GetBytes(text), TimeSpan.Zero, TimeSpan.FromSeconds(1)); } public Task ReceiveReminderAsync( string reminderName, byte[] state, TimeSpan dueTime, TimeSpan period) { if (reminderName == "test-reminder") { var text = Encoding.UTF8.GetString(state); Logger.LogWarning($"reminder fired: {text}"); } return Task.CompletedTask; }
RegisterReminderAsync
方法类似于 RegisterTimerAsync
,但不必显式指定回调方法。 如上面的示例所示,实现 IRemindable.ReceiveReminderAsync
以处理触发的提醒。
public class OrderStatusActor : Actor, IOrderStatusActor, IRemindable
ReceiveReminderAsync
触发提醒时调用 方法。 它采用 4 个参数:
在OrderStatusActor构造方法中调用SetReminderAsync
SetReminderAsync("this is a test reminder").ConfigureAwait(false).GetAwaiter().GetResult();
通过调用paid接口实例化一个Actor,即可开启reminder
查看控制台,reminder触发成功
== APP == warn: FrontEnd.ActorDefine.OrderStatusActor[0] == APP == reminder fired: this is a test reminder