一、基本关系
1、单体服务
2、微服务
3、创建流程
二、DefaultActorService
DefaultActorService
是整个规则引擎的初始化入口。
DefaultActorService
会初始化一个使用actor模型的规则引擎,共分为2步:
- ①创建actorSystem;
- ②处理应用初始化完成事件
1、创建actorSystem
1、创建appActor
@PostConstruct
public void initActorSystem() {
log.info("Initializing actor system.");
actorContext.setActorService(this);
TbActorSystemSettings settings = new TbActorSystemSettings(actorThroughput, schedulerPoolSize, maxActorInitAttempts);
system = new DefaultTbActorSystem(settings);
system.createDispatcher(APP_DISPATCHER_NAME, initDispatcherExecutor(APP_DISPATCHER_NAME, appDispatcherSize));
system.createDispatcher(TENANT_DISPATCHER_NAME, initDispatcherExecutor(TENANT_DISPATCHER_NAME, tenantDispatcherSize));
system.createDispatcher(DEVICE_DISPATCHER_NAME, initDispatcherExecutor(DEVICE_DISPATCHER_NAME, deviceDispatcherSize));
system.createDispatcher(RULE_DISPATCHER_NAME, initDispatcherExecutor(RULE_DISPATCHER_NAME, ruleDispatcherSize));
actorContext.setActorSystem(system);
// 创建appActor,全局唯一
appActor = system.createRootActor(APP_DISPATCHER_NAME, new AppActor.ActorCreator(actorContext));
actorContext.setAppActor(appActor);
TbActorRef statsActor = system.createRootActor(TENANT_DISPATCHER_NAME, new StatsActor.ActorCreator(actorContext, "StatsActor"));
actorContext.setStatsActor(statsActor);
log.info("Actor system initialized.");
}
DefaultTbActorSystem.createActor
private TbActorRef createActor(String dispatcherId, TbActorCreator creator, TbActorId parent) {
Dispatcher dispatcher = dispatchers.get(dispatcherId);
if (dispatcher == null) {
log.warn("Dispatcher with id [{}] is not registered!", dispatcherId);
throw new RuntimeException("Dispatcher with id [" + dispatcherId + "] is not registered!");
}
TbActorId actorId = creator.createActorId();
TbActorMailbox actorMailbox = actors.get(actorId);
if (actorMailbox != null) {
log.debug("Actor with id [{}] is already registered!", actorId);
} else {
Lock actorCreationLock = actorCreationLocks.computeIfAbsent(actorId, id -> new ReentrantLock());
actorCreationLock.lock();
try {
actorMailbox = actors.get(actorId);
if (actorMailbox == null) {
log.debug("Creating actor with id [{}]!", actorId);
// appActor类型
TbActor actor = creator.createActor();
TbActorRef parentRef = null;
if (parent != null) {
parentRef = getActor(parent);
if (parentRef == null) {
throw new TbActorNotRegisteredException(parent, "Parent Actor with id [" + parent + "] is not registered!");
}
}
TbActorMailbox mailbox = new TbActorMailbox(this, settings, actorId, parentRef, actor, dispatcher);
actors.put(actorId, mailbox);
// 最后会调用appActor.init方法
mailbox.initActor();
actorMailbox = mailbox;
if (parent != null) {
parentChildMap.computeIfAbsent(parent, id -> ConcurrentHashMap.newKeySet()).add(actorId);
}
} else {
log.debug("Actor with id [{}] is already registered!", actorId);
}
} finally {
actorCreationLock.unlock();
actorCreationLocks.remove(actorId);
}
}
return actorMailbox;
}
appActor.init()
在初始化appActor的时候会启动一个定时任务,去定时清除掉过期的sessionInfo。后面在做扩展的时候需要注意,sessionInfo长时间不用可能会被清除。
public void init(TbActorCtx ctx) throws TbActorException {
super.init(ctx);
if (systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE)) {
systemContext.schedulePeriodicMsgWithDelay(ctx, SessionTimeoutCheckMsg.instance(),
systemContext.getSessionReportTimeout(), systemContext.getSessionReportTimeout());
}
}
2、创建stasActor
用于统计状态,创建过程与appActor相同。
== 注:appActor和stasActor全局唯一 ==
2、处理应用初始化完成事件
在应用初始化结束之后会接收到spring发送的ApplicationReadyEvent
事件,会向appActor中发送一个AppInitMsg消息。然后appActor会为每一个租户初始化一个tanentActor和ruleChain、ruleNode。
@AfterStartUp(order = AfterStartUp.ACTOR_SYSTEM)
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
log.info("Received application ready event. Sending application init message to actor system");
appActor.tellWithHighPriority(new AppInitMsg());
}
appActor.doProcess()
@Override
protected boolean doProcess(TbActorMsg msg) {
log.info("app actor s msg s msg type {}", msg.getMsgType());
if (!ruleChainsInitialized) {
if (MsgType.APP_INIT_MSG.equals(msg.getMsgType())) {
// 初始化租户的actor
initTenantActors();
ruleChainsInitialized = true;
} else {
if (!msg.getMsgType().isIgnoreOnStart()) {
log.warn("Attempt to initialize Rule Chains by unexpected message: {}", msg);
}
return true;
}
}
// ...
}
appActor.initTenantActors()
private void initTenantActors() {
log.info("Starting main system actor.");
try {
if (systemContext.isTenantComponentsInitEnabled()) {
// 查询所有租户
PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, ENTITY_PACK_LIMIT);
for (Tenant tenant : tenantIterator) {
log.debug("[{}] Creating tenant actor", tenant.getId());
// 为每一个租户初始化一个actor
getOrCreateTenantActor(tenant.getId()).ifPresentOrElse(tenantActor -> {
log.debug("[{}] Tenant actor created.", tenant.getId());
}, () -> {
log.debug("[{}] Skipped actor creation", tenant.getId());
});
}
}
log.info("Main system actor started.");
} catch (Exception e) {
log.warn("Unknown failure", e);
}
}
appActor.getOrCreateTenantActor() -> TbActorMailbox.getOrCreateChildActor() -> DefaultTbActorSystem.createChildActor() -> DefaultTbActorSystem.createActor()。
传入的actorId是tanentId,ActorCreator是TanentActor.ActorCreator(会传入tenantId,后面初始化ruleChain时会用到)。
在DefaultTbActorSystem.createActor()方法中TbActorMailbox中的actor就是TenantActor.class类型,所以在mailbox.initActor();方法中会调用tenantActor.init()方法进行初始化。
tenantActor.init()
public void init(TbActorCtx ctx) throws TbActorException {
super.init(ctx);
log.debug("[{}] Starting tenant actor.", tenantId);
try {
Tenant tenant = systemContext.getTenantService().findTenantById(tenantId);
if (tenant == null) {
cantFindTenant = true;
log.info("[{}] Started tenant actor for missing tenant.", tenantId);
} else {
isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE);
isRuleEngine = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE);
if (isRuleEngine) {
if (systemContext.getPartitionService().isManagedByCurrentService(tenantId)) {
try {
if (getApiUsageState().isReExecEnabled()) {
log.debug("[{}] Going to init rule chains", tenantId);
// 初始化ruleChain
initRuleChains();
} else {
log.info("[{}] Skip init of the rule chains due to API limits", tenantId);
}
} catch (Exception e) {
log.info("Failed to check ApiUsage \"ReExecEnabled\"!!!", e);
cantFindTenant = true;
}
} else {
log.info("Tenant {} is not managed by current service, skipping rule chains init", tenantId);
}
}
log.debug("[{}] Tenant actor started.", tenantId);
}
} catch (Exception e) {
log.warn("[{}] Unknown failure", tenantId, e);
}
}
在tenantActor初始化时会去初始化该租户下的ruleChain。
RuleChainManagerActor.initRuleChains()
protected void initRuleChains() {
log.debug("[{}] Initializing rule chains", tenantId);
// 查询该租户下所有的ruleChain
for (RuleChain ruleChain : new PageDataIterable<>(link -> ruleChainService.findTenantRuleChainsByType(tenantId, RuleChainType.CORE, link), ContextAwareActor.ENTITY_PACK_LIMIT)) {
RuleChainId ruleChainId = ruleChain.getId();
log.debug("[{}|{}] Creating rule chain actor", ruleChainId.getEntityType(), ruleChain.getId());
// 初始化每一个ruleChain
TbActorRef actorRef = getOrCreateActor(ruleChainId, id -> ruleChain);
visit(ruleChain, actorRef);
log.debug("[{}|{}] Rule Chain actor created.", ruleChainId.getEntityType(), ruleChainId.getId());
}
ruleChainsInitialized = true;
}
RuleChainManagerActor.getOrCreateActor() -> TbActorMailbox.getOrCreateChildActor() -> DefaultTbActorSystem.createChildActor() -> DefaultTbActorSystem.createActor()。
DefaultTbActorSystem.createActor()
private TbActorRef createActor(String dispatcherId, TbActorCreator creator, TbActorId parent) {
Dispatcher dispatcher = dispatchers.get(dispatcherId);
if (dispatcher == null) {
log.warn("Dispatcher with id [{}] is not registered!", dispatcherId);
throw new RuntimeException("Dispatcher with id [" + dispatcherId + "] is not registered!");
}
TbActorId actorId = creator.createActorId();
TbActorMailbox actorMailbox = actors.get(actorId);
if (actorMailbox != null) {
log.debug("Actor with id [{}] is already registered!", actorId);
} else {
Lock actorCreationLock = actorCreationLocks.computeIfAbsent(actorId, id -> new ReentrantLock());
actorCreationLock.lock();
try {
actorMailbox = actors.get(actorId);
if (actorMailbox == null) {
log.debug("Creating actor with id [{}]!", actorId);
// RuleChainActor.class类型
TbActor actor = creator.createActor();
TbActorRef parentRef = null;
if (parent != null) {
parentRef = getActor(parent);
if (parentRef == null) {
throw new TbActorNotRegisteredException(parent, "Parent Actor with id [" + parent + "] is not registered!");
}
}
TbActorMailbox mailbox = new TbActorMailbox(this, settings, actorId, parentRef, actor, dispatcher);
actors.put(actorId, mailbox);
// 会调用RuleChainActor.init()方法,但是RuleChainActor没有重写init方法,而RuleChainActor继承ComponentActor,所以会调用ComponentActor.init()方法
mailbox.initActor();
actorMailbox = mailbox;
if (parent != null) {
parentChildMap.computeIfAbsent(parent, id -> ConcurrentHashMap.newKeySet()).add(actorId);
}
} else {
log.debug("Actor with id [{}] is already registered!", actorId);
}
} finally {
actorCreationLock.unlock();
actorCreationLocks.remove(actorId);
}
}
return actorMailbox;
}
ComponentActor.init()
@Override
public void init(TbActorCtx ctx) throws TbActorException {
super.init(ctx);
// createProcessor会返回一个RuleChainActorMessageProcessor类型
this.processor = createProcessor(ctx);
// 这个方法里面会去初始化ruleNode
initProcessor(ctx);
}
RuleChainActor.createProcessor()
@Override
protected RuleChainActorMessageProcessor createProcessor(TbActorCtx ctx) {
return new RuleChainActorMessageProcessor(tenantId, ruleChain, systemContext,
ctx.getParentRef(), ctx);
}
ComponentActor.initProcessor()
protected void initProcessor(TbActorCtx ctx) throws TbActorException {
try {
log.debug("[{}][{}][{}] Starting processor.", tenantId, id, id.getEntityType());
// 这个方法里面回去初始化ruleNode
processor.start(ctx);
logLifecycleEvent(ComponentLifecycleEvent.STARTED);
if (systemContext.isStatisticsEnabled()) {
scheduleStatsPersistTick();
}
} catch (Exception e) {
log.debug("[{}][{}] Failed to start {} processor.", tenantId, id, id.getEntityType(), e);
logAndPersist("OnStart", e, true);
logLifecycleEvent(ComponentLifecycleEvent.STARTED, e);
throw new TbActorException("Failed to init actor", e);
}
}
RuleChainActorMessageProcessor.start()
@Override
public void start(TbActorCtx context) {
if (!started) {
RuleChain ruleChain = service.findRuleChainById(tenantId, entityId);
if (ruleChain != null && RuleChainType.CORE.equals(ruleChain.getType())) {
// 找到租户下的ruleChain
List<RuleNode> ruleNodeList = service.getRuleChainNodes(tenantId, entityId);
log.debug("[{}][{}] Starting rule chain with {} nodes", tenantId, entityId, ruleNodeList.size());
// Creating and starting the actors;
for (RuleNode ruleNode : ruleNodeList) {
log.trace("[{}][{}] Creating rule node [{}]: {}", entityId, ruleNode.getId(), ruleNode.getName(), ruleNode);
// 创建ruleNode
TbActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);
// 缓存
nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
}
initRoutes(ruleChain, ruleNodeList);
started = true;
}
} else {
onUpdate(context);
}
}
- createRuleNodeActor方法最终也会调用到
DefaultTbActorSystem.createActor
,而actor的类型是RuleNodeActor.class
。 - 所以会调用ruleNodeActor.init方法,而ruleNodeActor继承自ComponentActor,所以也会调用到ComponentActor中的init方法,
- 跟ruleChainActor一样,最后会调用到ruleNodeActor中的createProcessor方法,返回值是一个RuleNodeActorMessageProcessor类型,
ComponentActor.initProcessor
中的start方法就会调用到RuleNodeActorMessageProcessor.start
方法
RuleNodeActorMessageProcessor.start
@Override
public void start(TbActorCtx context) throws Exception {
if (isMyNodePartition()) {
log.debug("[{}][{}] Starting", tenantId, entityId);
// 初始化node节点
tbNode = initComponent(ruleNode);
if (tbNode != null) {
state = ComponentLifecycleState.ACTIVE;
}
}
}
private TbNode initComponent(RuleNode ruleNode) throws Exception {
TbNode tbNode = null;
if (ruleNode != null) {
Class<?> componentClazz = Class.forName(ruleNode.getType());
tbNode = (TbNode) (componentClazz.getDeclaredConstructor().newInstance());
// 具体每个node的init方法
tbNode.init(defaultCtx, new TbNodeConfiguration(ruleNode.getConfiguration()));
}
return tbNode;
}
三、DeviceActor
deviceActor不是在server启动时创建的,而是当设备上来数据的时候才会去初始化。
具体是在TenantActor.onToDeviceActorMsg
中。
private void onToDeviceActorMsg(DeviceAwareMsg msg, boolean priority) {
if (!isCore) {
log.warn("RECEIVED INVALID MESSAGE: {}", msg);
}
if (deletedDevices.contains(msg.getDeviceId())) {
log.debug("RECEIVED MESSAGE FOR DELETED DEVICE: {}", msg);
return;
}
// 创建deviceActor
TbActorRef deviceActor = getOrCreateDeviceActor(msg.getDeviceId());
if (priority) {
deviceActor.tellWithHighPriority(msg);
} else {
deviceActor.tell(msg);
}
}
四、代码调试思路
因为整个server中Actor是核心,所以就从actor入手。
public interface TbActor {
boolean process(TbActorMsg msg);
TbActorRef getActorRef();
default void init(TbActorCtx ctx) throws TbActorException {
}
default void destroy(TbActorStopReason stopReason, Throwable cause) throws TbActorException {
}
default InitFailureStrategy onInitFailure(int attempt, Throwable t) {
return InitFailureStrategy.retryWithDelay(5000L * attempt);
}
default ProcessFailureStrategy onProcessFailure(TbActorMsg msg, Throwable t) {
if (t instanceof Error) {
return ProcessFailureStrategy.stop();
} else {
return ProcessFailureStrategy.resume();
}
}
}
在TbActor中,很明显init是初始化的方法,肯定会在初始化的时候调用,找到其中的一个实现如appActor.init(),find useages找到调用appActor.init的地方,发现只有一个地方在调用:TbActorMailbox.tryInit,在沿着往上找,就可以找到DefaultActorService.initActorSystem()
方法。