- 1 Overview
- 2 Basic Abilities
- 3 Flow Description Language
- 4 Asynchronous
- 5 Interceptors
- 6 Data Statistics
- 7 Spring-Boot-Starter
- 8 Non-Spring Environment
The flow-engine
aims to provide orchestration capabilities for functional nodes. Below is a brief introduction to the use of flow-engine using the example of smart homes: Mr. Wang has a smart thermometer and a smart air conditioner connected to the IoT platform. The thermometer reports the temperature every minute. The following functionalities are required: when the temperature exceeds 30 degrees Celsius, turn on the air conditioner; when the temperature drops below 10 degrees Celsius, turn on the heater; for other temperatures, turn off the air conditioner. We may need to write the following logic:
if (temperature > 30) {
control(airCondition, open, coldMode)
} else if (temperature < 10) {
control(airCondition, open, heatMode)
} else {
control(airCondition, close)
}
Later, Mr. Wang added a smart floor heating system and wanted to turn on the floor heating instead of the air conditioner when the temperature drops below 10 degrees Celsius. At this point, we have to modify the code and redeploy it, which is very cumbersome.
if (temperature > 30) {
control(airCondition, open, coldMode)
} else if (temperature < 10) {
control(floorHeating, open)
} else {
control(airCondition, close)
control(floorHeating, close)
}
With the help of flow-engine
, we can separate the temperature judgment and device control into two independent functional nodes. Then, we can use the descriptive language of flow-engine
to orchestrate these functional nodes. Real-time deployment can be achieved using the interfaces provided by flow-engine
.
The following will introduce the basic concepts and usage of flow-engine
.
- Rich atomic capabilities
- Action
- Condition
- Gateway
- JoinGateway
- hard
- soft
- or
- ExclusiveGateway
- JoinGateway
- Listener
- event
- before
- success
- failure
- scope
- node
- global
- event
- SubFlow
- Concise flow description language
- Cascade
- Parallel
- Aggregation
- Listener
if
statementif else
statementselect
statementjoin
statementjoin then
statementsub
statementsub then
statementsub then else
statement
- Rich mechanisms
- Promise
- PromiseListener
- DelegateInterceptor
- DelegateField
- Pause and resume of ExecutionLink
- Thread pool isolation
- Asynchronous execution timeout setting
- Integration with Spring-Boot-Starter
- Data statistics
- Execution link
- Variable update records
- Node execution time
- Attribute
Element
: Defines the basic capabilities of elements in the flow, includingid
: The unique id at the flow leveltype
: The type of the element, seeElementType
for detailsflow
: Associated flow itself
Attachable
: Can be attached to a node, includingattachedId
: The id of the attached node
Executable
: An element that can execute Java logic, includingname
: The name of the elementargumentNames
: The names of the argumentsargumentValues
: The values of the arguments
Node
: Defines nodes in the flow. Nodes are the smallest topological units in the flow, includingpredecessors
: Predecessor collectionsuccessors
: Successor collectionlisteners
: Listener collection
Conditional
: Nodes in the flow that can be followed by conditional nodes, includinglinktype
: Link type, includingNORMAL
,TRUE
, andFALSE
Listener
: Listener, includingscope
: Listener scope, includingglobal
andnode
event
: Event type, includingbefore
,success
, andfailure
Activity
: The definition of this concept is referenced fromflowable
Action
: A normal execution nodeCondition
: A conditional execution nodeGateway
: Gateway nodeExclusiveGateway
: Exclusive gateway, only allows the firstCondition
that satisfies the condition to passJoinGateway
: Join gateway, aggregates multiple execution branches into one branchjoinMode
: Aggregation mode, includinghard
mode,soft
mode, andor
modehard
means that all predecessor nodes must be executed normally for the join to passsoft
means that all reachable predecessor nodes must be executed normally for the join to pass (in other words, unreachable branches are allowed to exist)or
means that the join can pass if any predecessor node is executed normally
Flow
: The flow itself can also exist as a node in the top-level flow topology (i.e., sub-flow)
In the flow engine, the connections between nodes are called Links
, and there are three types of links:
LinkType.TRUE
: True branchLinkType.FALSE
: False branchLinkType.NORMAL
: Default branch
Among them, the types of links between Condition
and subsequent nodes are LinkType.TRUE
or LinkType.FALSE
. For other types of nodes, the types of links with subsequent nodes are LinkType.NORMAL
. Please refer to the following diagram for example:
In the following text, we refer to the connection between a node and its subsequent node as a successor Link
.
The Node
is the most basic element in the flow topology structure, and other elements, such as Listener
, can only exist by attaching to a Node
. Node
includes:
Action
Condition
JoinGateway
ExclusiveGateway
Flow
Executable
is an element that can execute Java code, including Action
, Condition
, and Listener
.
Registration
- In a non-Spring environment, registration needs to be done through the static methods of
com.github.liuyehcf.framework.flow.engine.FlowEngine
registerActionDelegateFactory
registerConditionDelegateFactory
registerListenerDelegateFactory
- In a Spring environment, no configuration is required. The flow engine will automatically complete the registration work.
@Component
, the node name is the name of the bean@ActionBean
, this annotation is marked with@Component
by default. The node name is specified bynames
and can have multiple aliases.@ConditionBean
, this annotation is marked with@Component
by default. The node name is specified bynames
and can have multiple aliases.@ListenerBean
, this annotation is marked with@Component
by default. The node name is specified bynames
and can have multiple aliases.
Naming
- Identifier, such as
actionA
,action1
,_action_2
- Identifier combined with
.
, such asmy.test.condition3
- Identifier combined with
/
, such asmy/test/listener5
Action
is a normal node that can execute Java code. Action
only allows successor Links of type LinkType.NORMAL
. The inheritance relationship is as follows:
To create an Action
that can execute business logic, you only need to implement the ActionDelegate
interface.
package com.github.liuyehcf.framework.flow.engine.runtime.delegate;
import com.github.liuyehcf.framework.flow.engine.runtime.delegate.context.ActionContext;
/**
* @author hechenfeng
* @date 2019/4/27
*/
public interface ActionDelegate extends Delegate {
/**
* method invoke when action is reached
*
* @param context context of execution
* @throws Exception exceptions
*/
void onAction(ActionContext context) throws Exception;
}
The context
can be used to
- Get property values
- Set property values (Setting property values through this method will be recorded by the engine and will be applied to aggregation logic. The change information will be saved in the execution Trace)
- Get environment variables (Directly manipulate environment variables to set property values, which will not be recorded by the engine. Use with caution)
- Add attributes (Add property values for storing business data, isolated from environment variables)
Condition
is a conditional node that can execute Java code. Condition
only allows successor Links of type LinkType.TRUE
or LinkType.FALSE
.
To create a Condition
that can execute business logic, you only need to implement the ConditionDelegate
interface. The return value of the onCondition
method determines the direction of the entire flow. If true
is returned, the LinkType.TRUE
branch will be taken. If false
is returned, the LinkType.FALSE
branch will be taken.
package com.github.liuyehcf.framework.flow.engine.runtime.delegate;
import com.github.liuyehcf.framework.flow.engine.runtime.delegate.context.ConditionContext;
/**
* @author hechenfeng
* @date 2019/4/27
*/
public interface ConditionDelegate extends Delegate {
/**
* method invoke when condition is reached
*
* @param context context of execution
* @throws Exception exceptions
*/
boolean onCondition(ConditionContext context) throws Exception;
}
The context
can be used to
- Get property values
- Set property values (Setting property values through this method will be recorded by the engine and will be applied to aggregation logic. The change information will be saved in the execution Trace)
- Get environment variables (Directly manipulate environment variables to set property values, which will not be recorded by the engine. Use with caution)
- Add attributes (Add property values for storing business data, isolated from environment variables)
Listener
is not a node in the flow topology structure, but it must exist attached to a node. It includes:
Flow
(scope:global/node
)- Only
node
listeners can be configured forSub Flow
global
listeners can be configured forFlow
orSub Flow
- Only
Activity
(scope:node
)Action
Condition
Gateway
(scope:node
)JoinGateway
ExclusiveGateway
event: Listener triggers
before
: Triggered when the execution link has just reached the node where the listener is attached (the attached node has not executed related logic at this time)success
: Triggered when the node where the listener is attached is executed successfully- For nodes such as
Action
andCondition
, thesuccess
listener is triggered only when the node execution does not throw an exception - For nodes such as
JoinGateway
andExclusiveGateway
, they do not include execution logic, which means they cannot throw exceptions. Therefore, thesuccess
listener will always be triggered.
- For nodes such as
failure
: Triggered when the attached node throws an exception- This type of listener can only be used to perceive exceptional situations, but cannot handle exceptions. In other words, after this listener is processed, the exception will still be thrown to the upper layer.
- If an exception is thrown in the processing logic of the listener, there will be two exceptions: the original exception thrown by the attached node (referred to as
original-exception
) and the exception thrown by the listener (referred to aslistener-exception
). The final exception thrown follows the following logic:- If the exception thrown by the listener is
LinkExecutionTerminateException
, the final exception thrown will be theoriginal-exception
. - Otherwise, the final exception thrown will be the
listener-exception
.
- If the exception thrown by the listener is
scope: Scope of the Listener
node
: Node-level listener, which can be attached toAction
,Condition
, andGateway
global
: Flow-level listener, which means it is triggered before/after successful execution or when an execution exception occurs- If the flow has multiple execution branches, the
success/failure
listener will also be executed only once - For example, in the diagram below, if
actionA
,actionB
, andactionC
are all executed successfully, theGlobalSuccessListener
will also be executed only once
- If the flow has multiple execution branches, the
To create a Listener
that can execute business logic, you only need to implement the ListenerDelegate
interface.
package com.github.liuyehcf.framework.flow.engine.runtime.delegate;
import com.github.liuyehcf.framework.flow.engine.runtime.delegate.context.ListenerContext;
/**
* @author hechenfeng
* @date 2019/4/27
*/
public interface ListenerDelegate extends Delegate {
/**
* method invoke when listener is reached
*
* @param context context of execution
* @throws Exception exceptions
*/
default void onBefore(ListenerContext context) throws Exception {
// default implementation
}
/**
* method invoke when bound element' execution succeeded
*
* @param context context of execution
* @throws Exception exceptions
*/
default void onSuccess(ListenerContext context, Object result) throws Exception {
// default implementation
}
/**
* method invoke when bound element' execution failed
*
* @param context context of execution
* @throws Exception exceptions
*/
default void onFailure(ListenerContext context, Throwable cause) throws Exception {
// default implementation
}
}
The context
can be used to
- Get property values
- Set property values (Setting property values through this method will be recorded by the engine and will be applied to aggregation logic. The change information will be saved in the execution Trace)
- Get environment variables (Directly manipulate environment variables to set property values, which will not be recorded by the engine. Use with caution)
- Add attributes (Add property values for storing business data, isolated from environment variables)
ActionDelegate
, ConditionDelegate
, and ListenerDelegate
all inherit the Delegate
interface. In this section, Delegate
is used as a generic term for these three specific interfaces.
If a Delegate
needs to configure some variables, it must use DelegateField
, which will be automatically injected by the flow engine. There are two ways to inject variables:
set
method: This method is preferred and recommended (not affected by Spring-AOP).- Field injection: This method is not recommended. In a Spring environment, if AOP is configured, this method will be ineffective, and the fields will be injected into the wrapper class.
- If a node does not specify a parameter, the value obtained through
DelegateField.getValue
method will be null, but theDelegateField
itself will not be null.
import com.github.liuyehcf.framework.flow.engine.runtime.delegate.ActionDelegate;
import com.github.liuyehcf.framework.flow.engine.runtime.delegate.context.ActionContext;
import com.github.liuyehcf.framework.flow.engine.runtime.delegate.field.DelegateField;
/**
* @author hechenfeng
* @date 2019/7/10
*/
public class MyAction implements ActionDelegate {
private DelegateField arg1;
// arg2 没有定义set方法,那么注入会走字段反射
private DelegateField arg2;
// arg1 定义了set方法,那么注入会优先走set方法
public void setArg1(DelegateField arg1) {
this.arg1 = arg1;
}
@Override
public void onAction(ActionContext context) {
// 直接通过getValue获取实际的参数值
Object value1 = arg1.getValue();
Object value2 = arg2.getValue();
// ...
}
}
When a node's business logic is complex, it is recommended to use a business thread pool to handle the business logic instead of executing it in the flow engine's thread pool.
ActionDelegate
, ConditionDelegate
, and ListenerDelegate
can be set to execute asynchronously using the isAsync
method. The timeout can be set using the getAsyncTimeout
method. The business thread pool can be set using the getAsyncExecutor
method. These methods are defined in the top-level Delegate
interface and are non-asynchronous by default.
package com.github.liuyehcf.framework.flow.engine.runtime.delegate;
import java.util.concurrent.ExecutorService;
/**
* @author hechenfeng
* @date 2019/4/27
*/
public interface Delegate {
/**
* whether execution in async mode
* default is sync mode, which means that it will execute in the thread pool of the FlowEngine
*/
default boolean isAsync() {
return false;
}
/**
* thread pool for executing asynchronous logic
* flowEngine's executor will be used if return value is null
* <p>
* invalid when isAsync() is false
*/
default ExecutorService getAsyncExecutor() {
return null;
}
/**
* timeout of async execution, the unit is milliseconds
* non-positive value means wait until execution finished, default is 0
* <p>
* if the execution time exceeds the specified time, an interrupt signal is emitted,
* but the response depends on the business code itself.
* <p>
* invalid when isAsync() is false
*/
default long getAsyncTimeout() {
return 0;
}
}
There are two types of gateways: ExclusiveGateway and JoinGateway. The following sections will introduce these two gateways and their specific use cases.
For ordinary nodes, such as Action
or Condition
, when there are multiple successor nodes, the default behavior is concurrent execution. For example, node 0
has n successor nodes: node 1
, node 2
, ..., node n
. After node 0
is executed, it will fork n execution paths to continue execution, and after forking, each execution path will have its own environment variables and will not interfere with each other.
However, sometimes we want only one of these n successor nodes to be executed. In this case, we need to use the ExclusiveGateway.
The successor nodes of ExclusiveGateway
must be Condition
nodes. ExclusiveGateway
ensures that at most one successor path is executed, and the execution order is exactly the same as the order of the nodes. In the diagram above, ExclusiveGateway
has n successor Condition
nodes: condition 1
, condition 2
, ..., condition n
. First, condition 1
will be executed, and if the result is true, the subsequent nodes of condition 1
will be executed. If the result is false, condition 2
will be executed, and so on.
For ordinary nodes that have only one predecessor node (excluding the start node), the JoinGateway
allows multiple predecessor nodes and performs aggregation processing, including:
- Aggregation of environment variables: The
JoinGateway
aggregates the environment variables of all preceding branches. If different branches simultaneously modify the value of the same variable, the aggregated value of that variable may be from any branch. - Aggregation of execution paths: The
JoinGateway
aggregates the trace information of all preceding branches according to the execution time order.
The JoinGateway
has three aggregation modes: hard
, soft
, and or
. Here are the explanations for each mode:
hard
mode: Only when all preceding nodes have arrived, the gateway allows passage.soft
mode: Only when allreachable
preceding nodes have arrived, the gateway allows passage.or
mode: The gateway allows passage whenany
preceding node arrives, and it only allows passage once.
actionB
is on theTRUE
branch ofconditionA
, which meansactionB
will only be executed if the result ofconditionA
is true.
hard mode:
- If the result of
conditionA
is true, the gateway allows passage. - If the result of
conditionB
is false,actionB
is unreachable. Therefore, in thehard
mode, the gateway does not allow passage.
soft mode:
- If the result of
conditionA
is true, the gateway allows passage. - If the result of
conditionB
is false,actionB
is unreachable. Therefore, in thesoft
mode, the gateway allows passage for all reachable preceding nodes (actionA
only).
or mode:
- The gateway allows passage when either
actionA
oractionB
arrives.
The flow engine provides the concept of sub flow
, which greatly enriches the flexibility of flow orchestration. We can put a group of nodes in a sub flow
and easily perceive the execution result of this group of nodes.
Consider the following example:
In this scenario, the sub flow
is very similar to the or
mode of JoinGateway
, but there is a significant difference: the sub flow
can perceive the execution status of this group of nodes, while the or
mode of JoinGateway
cannot perceive the execution status of this group of nodes.
- When at least one node (
conditionA
,conditionB
,conditionC
) has a true execution result, theor
mode ofJoinGateway
allows passage, andactionB
is executed. If all the execution results ofconditionA
,conditionB
,conditionC
are false,actionB
is unreachable, and this cannot be perceived. - When the
sub flow
is executed successfully,actionB
is executed. When thesub flow
fails,actionC
is executed. As you can see, the execution result of thesub flow
can be easily perceived.
How to define the success of the sub flow
execution: When any branch successfully executes (i.e., any branch reaches the leaf node), the sub flow
is considered successful; otherwise, the sub flow
is considered failed.
Listeners for sub flow
: We can configure global
and node
level listeners for the sub flow
.
Global
level listeners behave the same as global level listeners for flows.Node
level listeners behave the same as listeners for ordinary nodes.- For
sub flow
,node
level listeners andglobal
level listeners are equivalent.
For Executable
, including Action
, Condition
, and Listener
:
- If a
LinkExecutionTerminateException
is thrown during execution, all subsequent nodes of that node will be marked as unreachable, and the flow will remain in a normal state. - If any other exception is thrown during execution, the flow execution enters an exceptional state, which means all nodes of all branches of the flow terminate execution, and
promise.isFailure
is true.
{
actionA() {
actionB()
}
}
{
actionA(),
actionB(){
actionD(),
actionE()
},
actionC()
}
{
if(conditionA())
}
{
if(conditionA()) {
actionA()
}
}
{
if(conditionA()){
actionA(),
actionB(),
actionC()
}
}
{
if(conditionA()) {
actionA()
} else {
actionB()
}
}
{
if(conditionA()){
actionA(),
actionB()
} else {
actionC(),
actionD()
}
}
{
select {
if(conditionA()){
actionA()
},
if(conditionB()){
actionB()
},
if(conditionC()){
actionC()
}
}
}
Note:
- Use
&
to mark the nodes that need to be aggregated. - Use
*
to indicate thehard
mode.
hard mode
{
join & {
actionA()&,
actionB(),
actionC()&
}
}
soft mode
{
join {
actionA()&,
actionB(),
actionC()&
}
}
or mode
{
join | {
actionA()&,
actionB(),
actionC()&
}
}
hard mode
{
join & {
if(conditionA()){
actionC()
} else {
actionD()&
},
actionA()&,
actionB()&
}
}
soft mode
{
join {
if(conditionA()){
actionC()
} else {
actionD()&
},
actionA()&,
actionB()&
}
}
or mode
{
join | {
if(conditionA()){
actionC()
} else {
actionD()&
},
actionA()&,
actionB()&
}
}
Note: Use &
to indicate the nodes to be aggregated.
hard mode
{
join & {
actionA()&,
actionB(),
actionC()&
} then {
actionD()
}
}
soft mode
{
join {
actionA()&,
actionB(),
actionC()&
} then {
actionD()
}
}
or mode
{
join | {
actionA()&,
actionB(),
actionC()&
} then {
actionD()
}
}
hard mode
{
join & {
if(conditionA()){
actionC()
} else {
actionD()&
},
actionA()&,
actionB()&
} then {
actionE()
}
}
soft mode
{
join {
if(conditionA()){
actionC()
} else {
actionD()&
},
actionA()&,
actionB()&
} then {
actionE()
}
}
or mode
{
join | {
if(conditionA()){
actionC()
} else {
actionD()&
},
actionA()&,
actionB()&
} then {
actionE()
}
}
{
actionA(){
sub {
actionB(),
actionC(),
actionD()
}
}
}
{
actionA(){
sub {
join{
actionB()&,
actionC()&
}then {
actionD()
}
} then {
actionE()
}
}
}
{
actionA(){
sub{
select{
if(conditionA()){
actionB()
},
if(conditionB()){
actionC()
}
}
} then {
actionD()
} else{
actionE()
}
}
}
Global listeners
{
actionA()
} [listenerA(event="before")]
Action listeners
{
actionA()[listenerA(event="before")]
}
Condition listeners
{
if(conditionA() [listenerA(event="before")]) {
actionA()
}
}
ExclusiveGateway listeners
{
select {
if(conditionA()){
actionA()
},
if(conditionB()){
actionB()
}
} [listenerA(event="before")]
}
JoinGateway listeners
{
join {
actionA()&,
actionB()&
} [listenerA(event="before")] then {
actionD()
}
}
SubFlow listeners
{
sub {
actionA()
}[listenerA(event="before")],
sub {
actionB()
}[listenerB(event="success")] then {
actionC()
},
sub {
actionD()
}[listenerC(event="before")] then {
actionE()
} else {
actionF()
}
}
Multiple listeners are supported, separated by commas.
{
actionA()[listenerA(event="before"), listenerB(event="success")]
}
Parameter format: <name>=<value>
<value>
supports placeholders and literals, including int, long, hexadecimal, octal, float, double, and String.
Placeholder format: ${xxx.yyy.zzz}
After the stream is triggered, it will be executed asynchronously through a thread pool. Promises are provided to perceive the execution status of the stream and obtain the execution results of the stream (similar to Netty's ChannelFuture/ChannelPromise). It includes the following functions (all thread-safe):
boolean isCancelled()
: Determines if the stream has been canceled.boolean isDone()
: Determines if the stream has completed (successfully/failed/canceled).boolean isSuccess()
: Determines if the stream has executed successfully.boolean isFailure()
: Determines if the stream has executed failed.Throwable cause()
: Retrieves the exception when the stream execution fails.boolean tryCancel()
: Tries to cancel the stream, only succeeds if the stream is not yet completed.boolean trySuccess(T outcome)
: Tries to mark the stream as successful, only succeeds if the stream is not yet completed and successfully competes with other attempts.boolean tryFailure(Throwable cause)
: Tries to mark the stream as failed, only succeeds if the stream is not yet completed and successfully competes with other attempts.Promise addListener(PromiseListener listener)
: Adds a listener, ensuring that the listener is always executed (regardless of whether it is added during the stream execution or after completion).void sync()
: Synchronously blocks and waits for the stream to complete without throwing an exception.boolean await(long timeout, TimeUnit unit)
: Synchronously blocks for a specified time. Returns true if the stream completes (including success/failure/cancellation) within the specified time, otherwise returns false.T get()
: Synchronously blocks and waits for the execution result of the stream. Throws an exception if the stream execution fails or is canceled.T get(long timeout, TimeUnit unit)
: Synchronously blocks for a specified time and waits for the execution result of the stream. Throws an exception if the stream execution fails or is canceled within the specified time.
Promise listeners can be configured based on Promises, and they are triggered when the stream terminates normally or abnormally. Similar to Netty's ChannelFuture/ChannelPromise.
- When a
Promise
is completed, eachPromiseListener
is executed in the order they are added, and only executed once. - Regardless of the timing of adding
PromiseListener
(whether before or afterPromise
completion), allPromiseListeners
are guaranteed to be triggered when thePromise
completes.
The stream engine provides the ability to pause and resume an ExecutionLink. We can pause the current execution branch using ExecutableContext#pauseExecutionLink
, which returns an ExecutionLinkPausePromise
object. Later, we can restart the execution branch by calling ExecutionLinkPausePromise#trySuccess(null)
. Alternatively, you can cancel the pause by calling ExecutionLinkPausePromise#tryCancel
(which will cause the entire flow execution to be abnormal), or throw an exception by calling ExecutionLinkPausePromise#tryFailure(exception)
(when the exception type is LinkExecutionTerminateException
, it can terminate the execution of the current branch; otherwise, it will cause the entire flow execution to be abnormal).
Note that pausing does not affect the execution of Delegate
(Action
, Condition
, Listener
) and all DelegateInterceptor
. These two are atomic execution units. The pause point of the current execution branch is just before the next Element
, as shown in the diagram below:
elementA
: can beAction
,Condition
,Listener
elementB
: can beAction
,Condition
,Listener
,JoinGateway
,ExclusiveGateway
,SubFlow
The stream engine provides an interceptor mechanism similar to Spring AOP. The core interfaces include DelegateInterceptor
and DelegateInvocation
. With interceptors, we can easily implement various business capabilities such as execution statistics and logging.
DelegateInterceptor
is similar to MethodInterceptor
and also provides the ability to match nodes by using the matches
method.
package com.github.liuyehcf.framework.flow.engine.runtime.delegate.interceptor;
/**
* @author hechenfeng
* @date 2019/4/27
*/
public interface DelegateInterceptor {
/**
* whether given executableName matches this interceptor
*
* @param executableName executable name
* @return whether matches
*/
default boolean matches(String executableName) {
return true;
}
/**
* Implement this method to perform extra treatments before and
* after the invocation.
*
* @param delegateInvocation delegate invoker
* @return result of target delegate call
*/
Object invoke(DelegateInvocation delegateInvocation) throws Throwable;
}
DelegateInvocation
is similar to MethodInvocation
.
package com.github.liuyehcf.framework.flow.engine.runtime.delegate.interceptor;
import com.github.liuyehcf.framework.flow.engine.model.Element;
import com.github.liuyehcf.framework.flow.engine.model.ElementType;
import com.github.liuyehcf.framework.flow.engine.runtime.delegate.Delegate;
import com.github.liuyehcf.framework.flow.engine.runtime.delegate.context.ExecutableContext;
/**
* @author hechenfeng
* @date 2019/4/27
*/
public interface DelegateInvocation {
/**
* Proceed to the next interceptor in the chain.
*
* @return result of target delegate call
*/
Object proceed() throws Throwable;
/**
* get target delegate
*
* @return target delegate
*/
Delegate getDelegate();
/**
* get executable context
*
* @return executable context
*/
ExecutableContext<? extends Element> getExecutableContext();
/**
* type of delegate (action/condition/listener)
*
* @return delegate type
* @see com.github.liuyehcf.framework.flow.engine.model.Executable
* @see com.github.liuyehcf.framework.flow.engine.model.ElementType
*/
ElementType getType();
/**
* get array of argument name
*
* @return array of argument name
*/
String[] getArgumentNames();
/**
* get array of argument value
*
* @return array of argument value
*/
Object[] getArgumentValues();
}
- Execution Links: The number of execution links is equal to the number of parallel links.
- Execution Trace: Each execution link contains multiple node traces.
- Unique execution ID
- Execution node name
- Execution node parameters
- Execution node start time
- Execution node end time
- Set of variables changed by the execution node
- Exceptions captured during execution
- Collection of attributes of the execution node
Integration with Spring Boot Starter allows integrating the stream engine without any configuration.
Maven Dependency
<dependency>
<groupId>com.github.liuyehcf</groupId>
<artifactId>flow-engine-spring-boot-starter</artifactId>
<version>1.4.4</version>
</dependency>
- If using Spring's
@Component
or related annotations, the node name will be the bean name. - If using
@ActionBean
annotation, the node name will beActionBean.name()
.
- If
DelegateField
is configured, the type of the bean must be marked as@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
.
package com.github.liuyehcf.framework.flow.engine.spring.boot.starter.demo;
import com.github.liuyehcf.framework.flow.engine.runtime.delegate.ActionDelegate;
import com.github.liuyehcf.framework.flow.engine.runtime.delegate.context.ActionContext;
import com.github.liuyehcf.framework.flow.engine.runtime.delegate.field.DelegateField;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
/**
* @author hechenfeng
* @date 2020/1/4
*/
@Component(value = "printAction")
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class PrintAction implements ActionDelegate {
private DelegateField content;
public void setContent(DelegateField content) {
this.content = content;
}
@Override
public void onAction(ActionContext context) {
System.out.println("printAction. content=" + content.getValue());
}
}
- If using Spring's
@Component
or related annotations, the node name will be the bean name. - If using
@ConditionBean
annotation, the node name will beConditionBean.name()
.
- If
DelegateField
is configured, the type of the bean must be marked as@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
.
package com.github.liuyehcf.framework.flow.engine.spring.boot.starter.demo;
import com.github.liuyehcf.framework.flow.engine.runtime.delegate.ConditionDelegate;
import com.github.liuyehcf.framework.flow.engine.runtime.delegate.context.ConditionContext;
import com.github.liuyehcf.framework.flow.engine.runtime.delegate.field.DelegateField;
import com.github.liuyehcf.framework.flow.engine.spring.boot.starter.annotation.ConditionBean;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
/**
* @author hechenfeng
* @date 2020/1/4
*/
@ConditionBean(names = "printCondition")
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class PrintCondition implements ConditionDelegate {
private DelegateField content;
public void setContent(DelegateField content) {
this.content = content;
}
@Override
public boolean onCondition(ConditionContext conditionContext) throws Exception {
System.out.println("printCondition. content=" + content.getValue());
return true;
}
}
- If using Spring's
@Component
or related annotations, the node name will be the bean name. - If using
@ListenerBean
annotation, the node name will beListenerBean.name()
.
- If
DelegateField
is configured, the type of the bean must be marked as@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
.
package com.github.liuyehcf.framework.flow.engine.spring.boot.starter.demo;
import com.github.liuyehcf.framework.flow.engine.runtime.delegate.ListenerDelegate;
import com.github.liuyehcf.framework.flow.engine.runtime.delegate.context.ListenerContext;
import com.github.liuyehcf.framework.flow.engine.runtime.delegate.field.DelegateField;
import com.github.liuyehcf.framework.flow.engine.spring.boot.starter.annotation.ListenerBean;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
/**
* @author hechenfeng
* @date 2020/1/4
*/
@ListenerBean(names = "printListener")
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class PrintListener implements ListenerDelegate {
private DelegateField content;
public void setContent(DelegateField content) {
this.content = content;
}
@Override
public void onBefore(ListenerContext context) throws Exception {
System.out.println(String.format("printListener onBefore. content=%s", content.getValue()));
}
@Override
public void onSuccess(ListenerContext context, Object result) throws Exception {
System.out.println(String.format("printListener onSuccess. content=%s", content.getValue()));
}
@Override
public void onFailure(ListenerContext context, Throwable cause) throws Exception {
System.out.println(String.format("printListener onFailure. content=%s", content.getValue()));
}
}
Configuring DelegateInterceptor
is similar to Spring-Aop. The order of interceptors can be specified using the @Scope
annotation, and it behaves the same as Spring-Aop.
package com.github.liuyehcf.framework.flow.engine.spring.boot.starter.demo;
import com.github.liuyehcf.framework.flow.engine.runtime.delegate.interceptor.DelegateInterceptor;
import com.github.liuyehcf.framework.flow.engine.runtime.delegate.interceptor.DelegateInvocation;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
/**
* @author hechenfeng
* @date 2020/1/4
*/
@Component
@Order(value = 1)
public class DelegateInterceptorOrder1 implements DelegateInterceptor {
@Override
public Object invoke(DelegateInvocation delegateInvocation) throws Throwable {
System.out.println("enter DelegateInterceptorOrder1");
Object proceed = delegateInvocation.proceed();
System.out.println("exit DelegateInterceptorOrder1");
return proceed;
}
}
package com.github.liuyehcf.framework.flow.engine.spring.boot.starter.demo;
import com.github.liuyehcf.framework.flow.engine.runtime.delegate.interceptor.DelegateInterceptor;
import com.github.liuyehcf.framework.flow.engine.runtime.delegate.interceptor.DelegateInvocation;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
/**
* @author hechenfeng
* @date 2020/1/4
*/
@Component
@Order(value = 2)
public class DelegateInterceptorOrder2 implements DelegateInterceptor {
@Override
public Object invoke(DelegateInvocation delegateInvocation) throws Throwable {
System.out.println("enter DelegateInterceptorOrder2");
Object proceed = delegateInvocation.proceed();
System.out.println("exit DelegateInterceptorOrder2");
return proceed;
}
}
package com.github.liuyehcf.framework.flow.engine.spring.boot.starter.demo;
import com.alibaba.fastjson.JSON;
import com.github.liuyehcf.framework.common.tools.promise.Promise;
import com.github.liuyehcf.framework.common.tools.promise.PromiseListener;
import com.github.liuyehcf.framework.flow.engine.ExecutionCondition;
import com.github.liuyehcf.framework.flow.engine.FlowEngine;
import com.github.liuyehcf.framework.flow.engine.runtime.statistics.ExecutionInstance;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
/**
* @author hechenfeng
* @date 2020/1/4
*/
@SpringBootApplication(scanBasePackages = {"com.github.liuyehcf.framework.flow.engine.spring.boot.starter.demo"})
public class DemoApplication {
@Resource
private FlowEngine flowEngine;
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class);
}
@PostConstruct
public void flow() {
String dsl = "{\n" +
" if(printCondition(content=\"hello, \")) {\n" +
" printAction(content=\"hechenfeng\")\n" +
" }\n" +
"}";
Promise<ExecutionInstance> promise = flowEngine.startFlow(new ExecutionCondition(dsl));
// 注册监听
promise.addListener(new PromiseListener<ExecutionInstance>() {
@Override
public void operationComplete(Promise<ExecutionInstance> promise) {
System.out.println("trigger promise listener");
if (promise.isSuccess()) {
System.out.println(JSON.toJSONString(promise.get()));
} else if (promise.isFailure() && promise.cause() != null) {
promise.cause().printStackTrace();
}
}
});
}
}
Output
enter DelegateInterceptorOrder1
enter DelegateInterceptorOrder2
printCondition. content=hello,
exit DelegateInterceptorOrder2
exit DelegateInterceptorOrder1
enter DelegateInterceptorOrder1
enter DelegateInterceptorOrder2
printAction. content=hechenfeng
exit DelegateInterceptorOrder2
exit DelegateInterceptorOrder1
trigger promise listener
{"attributes":{},"endTimestamp":1601967730844,"env":{},"flow":{"elements":[{"flow":{"$ref":"$.flow"},"id":"1","linkType":"NORMAL","listeners":[],"predecessors":[],"successors":[{"argumentNames":["content"],"argumentValues":["hello, "],"flow":{"$ref":"$.flow"},"id":"2","linkType":"NORMAL","listeners":[],"name":"printCondition","predecessors":[{"$ref":"$.flow.elements[0]"}],"successors":[{"argumentNames":["content"],"argumentValues":["hechenfeng"],"flow":{"$ref":"$.flow"},"id":"3","linkType":"TRUE","listeners":[],"name":"printAction","predecessors":[{"$ref":"$.flow.elements[0].successors[0]"}],"successors":[],"type":"ACTION"}],"type":"CONDITION"}],"type":"START"},{"$ref":"$.flow.elements[0].successors[0]"},{"$ref":"$.flow.elements[0].successors[0].successors[0]"}],"ends":[{"$ref":"$.flow.elements[0].successors[0].successors[0]"}],"events":[],"id":"88ab739c-6e5f-408e-8f12-81594f8faa8a","linkType":"NORMAL","listeners":[],"name":"e37b560c-6ae9-4ead-a13f-e329ff736747","predecessors":[],"start":{"$ref":"$.flow.elements[0]"},"successors":[],"type":"SUB_FLOW"},"id":"6ea541ba-8890-4b01-b417-bffe89a2c91b","links":[{"env":{"$ref":"$.env"},"id":"a69e2290-1768-4dde-95af-1bc9e59f8fdd","traces":[{"endTimestamp":1601967730786,"executionId":0,"id":"1","startTimestamp":1601967730786,"type":"START","useTimeNanos":0},{"arguments":[{"name":"content","value":"hello, "}],"attributes":{},"endTimestamp":1601967730836,"executionId":1,"id":"2","name":"printCondition","propertyUpdates":[],"result":true,"startTimestamp":1601967730834,"type":"CONDITION","useTimeNanos":2173256},{"arguments":[{"name":"content","value":"hechenfeng"}],"attributes":{},"endTimestamp":1601967730843,"executionId":2,"id":"3","name":"printAction","propertyUpdates":[],"startTimestamp":1601967730842,"type":"ACTION","useTimeNanos":208637}]}],"startTimestamp":1601967730781,"traces":[],"unreachableLinks":[],"useTimeNanos":63322057}
Maven Dependency
<dependency>
<groupId>com.github.liuyehcf</groupId>
<artifactId>flow-engine</artifactId>
<version>1.4.4</version>
</dependency>
Example Code
package com.github.liuyehcf.framework.flow.engine.test.demo;
import com.alibaba.fastjson.JSON;
import com.github.liuyehcf.framework.common.tools.promise.Promise;
import com.github.liuyehcf.framework.common.tools.promise.PromiseListener;
import com.github.liuyehcf.framework.flow.engine.ExecutionCondition;
import com.github.liuyehcf.framework.flow.engine.runtime.DefaultFlowEngine;
import com.github.liuyehcf.framework.flow.engine.runtime.delegate.ActionDelegate;
import com.github.liuyehcf.framework.flow.engine.runtime.delegate.context.ActionContext;
import com.github.liuyehcf.framework.flow.engine.runtime.delegate.field.DelegateField;
import com.github.liuyehcf.framework.flow.engine.runtime.delegate.interceptor.DelegateInterceptor;
import com.github.liuyehcf.framework.flow.engine.runtime.delegate.interceptor.DelegateInvocation;
import com.github.liuyehcf.framework.flow.engine.runtime.statistics.ExecutionInstance;
/**
* @author hechenfeng
* @date 2020/1/4
*/
public class ReadmeDemo {
public static void main(String[] args) {
DefaultFlowEngine flowEngine = new DefaultFlowEngine();
// 注册一个action
flowEngine.registerActionDelegateFactory("greetAction", () -> {
return new ActionDelegate() {
private DelegateField name;
@Override
public void onAction(ActionContext context) throws Exception {
System.out.println(String.format("Hello, %s. This is flow engine!", (String) name.getValue()));
}
};
});
// 注册一个interceptor
flowEngine.registerDelegateInterceptorFactory(() ->
new DelegateInterceptor() {
@Override
public Object invoke(DelegateInvocation delegateInvocation) throws Throwable {
try {
System.err.println("enter MyDelegateInterceptor");
System.err.println(delegateInvocation.getExecutableContext().getFlowId());
System.err.println(delegateInvocation.getExecutableContext().getFlowName());
System.err.println(delegateInvocation.getExecutableContext().getInstanceId());
System.err.println(delegateInvocation.getExecutableContext().getName());
System.err.println(String.format("argumentNames=%s", JSON.toJSONString(delegateInvocation.getArgumentNames())));
System.err.println(String.format("argumentValues=%s", JSON.toJSONString(delegateInvocation.getArgumentValues())));
return delegateInvocation.proceed();
} finally {
System.err.println(String.format("attributes=%s", JSON.toJSONString(delegateInvocation.getExecutableContext().getGlobalAttributes())));
System.err.println("exit MyDelegateInterceptor");
}
}
});
String dsl = "{\n" +
" greetAction(name=\"hechenfeng\")\n" +
"}";
Promise<ExecutionInstance> promise = flowEngine.startFlow(new ExecutionCondition(dsl));
// 注册监听
promise.addListener(new PromiseListener<ExecutionInstance>() {
@Override
public void operationComplete(Promise<ExecutionInstance> promise) {
System.out.println("trigger promise listener");
if (promise.isSuccess()) {
System.out.println(JSON.toJSONString(promise.get()));
} else if (promise.isFailure() && promise.cause() != null) {
promise.cause().printStackTrace();
}
}
});
// 同步阻塞等待流执行完毕
promise.sync();
flowEngine.shutdown();
}
}
Output
a0f9c342-241a-4489-badd-e8a5c9221e43
8f02269a-05cd-4633-964e-f7b7658d385f
greetAction
argumentNames=["name"]
argumentValues=["hechenfeng"]
attributes={}
exit MyDelegateInterceptor
Hello, hechenfeng. This is flow engine!
trigger promise listener
{"attributes":{},"endTimestamp":1601967441173,"env":{},"flow":{"elements":[{"flow":{"$ref":"$.flow"},"id":"1","linkType":"NORMAL","listeners":[],"predecessors":[],"successors":[{"argumentNames":["name"],"argumentValues":["hechenfeng"],"flow":{"$ref":"$.flow"},"id":"2","linkType":"NORMAL","listeners":[],"name":"greetAction","predecessors":[{"$ref":"$.flow.elements[0]"}],"successors":[],"type":"ACTION"}],"type":"START"},{"$ref":"$.flow.elements[0].successors[0]"}],"ends":[{"$ref":"$.flow.elements[0].successors[0]"}],"events":[],"id":"81f6f256-6c96-4a57-aa05-4bb907a8d68c","linkType":"NORMAL","listeners":[],"name":"a0f9c342-241a-4489-badd-e8a5c9221e43","predecessors":[],"start":{"$ref":"$.flow.elements[0]"},"successors":[],"type":"SUB_FLOW"},"id":"8f02269a-05cd-4633-964e-f7b7658d385f","links":[{"env":{"$ref":"$.env"},"id":"f4490b46-13d9-4ab6-a579-29605807e3da","traces":[{"endTimestamp":1601967441081,"executionId":0,"id":"1","startTimestamp":1601967441081,"type":"START","useTimeNanos":0},{"arguments":[{"name":"name","value":"hechenfeng"}],"attributes":{},"endTimestamp":1601967441168,"executionId":1,"id":"2","name":"greetAction","propertyUpdates":[],"startTimestamp":1601967441092,"type":"ACTION","useTimeNanos":75623038}]}],"startTimestamp":1601967441071,"traces":[],"unreachableLinks":[],"useTimeNanos":102691937}