diff --git a/src/main/java/com/serliunx/statemanagement/machine/AbstractStateMachine.java b/src/main/java/com/serliunx/statemanagement/machine/AbstractStateMachine.java index e3acb31..2df3eec 100644 --- a/src/main/java/com/serliunx/statemanagement/machine/AbstractStateMachine.java +++ b/src/main/java/com/serliunx/statemanagement/machine/AbstractStateMachine.java @@ -9,6 +9,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; /** @@ -20,6 +21,9 @@ import java.util.function.Consumer; */ public abstract class AbstractStateMachine extends AbstractStateManager implements StateMachine { + /** + * 状态机上下文 + */ protected final StateMachineContext context; /** @@ -52,6 +56,9 @@ public abstract class AbstractStateMachine extends AbstractStateManager im if (executor instanceof ExecutorService) { ExecutorService es = (ExecutorService) executor; es.shutdown(); + if (!es.awaitTermination(10, TimeUnit.SECONDS)) { + es.shutdownNow(); + } } else if (executor instanceof AutoCloseable) { AutoCloseable ac = (AutoCloseable) executor; ac.close(); diff --git a/src/main/java/com/serliunx/statemanagement/machine/StateMachineContext.java b/src/main/java/com/serliunx/statemanagement/machine/StateMachineContext.java index e9e2420..6487bd2 100644 --- a/src/main/java/com/serliunx/statemanagement/machine/StateMachineContext.java +++ b/src/main/java/com/serliunx/statemanagement/machine/StateMachineContext.java @@ -1,11 +1,15 @@ package com.serliunx.statemanagement.machine; import com.serliunx.statemanagement.machine.handler.StateHandlerWrapper; +import com.serliunx.statemanagement.support.DefaultCountableRejectedExecutionHandler; import com.serliunx.statemanagement.support.ExecutorUtils; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; /** @@ -62,7 +66,7 @@ public final class StateMachineContext { */ private Executor executorAutoConfiguration(Executor source) { if (source == null) { - return ExecutorUtils.adaptiveThreadPool(); + return ExecutorUtils.adaptiveThreadPool(new DefaultCountableRejectedExecutionHandler()); } return source; } diff --git a/src/main/java/com/serliunx/statemanagement/support/CountableRejectedExecutionHandler.java b/src/main/java/com/serliunx/statemanagement/support/CountableRejectedExecutionHandler.java new file mode 100644 index 0000000..0854a60 --- /dev/null +++ b/src/main/java/com/serliunx/statemanagement/support/CountableRejectedExecutionHandler.java @@ -0,0 +1,27 @@ +package com.serliunx.statemanagement.support; + +import java.util.concurrent.RejectedExecutionHandler; + +/** + * 附带计数的拒绝策略 + * + * @author SerLiunx + * @version 1.0.0 + * @since 2025/3/5 + */ +public interface CountableRejectedExecutionHandler extends RejectedExecutionHandler { + + /** + * 获取当前拒绝的任务数量 + * + * @return 当目前为止所拒绝的任务数量 + */ + long getCount(); + + /** + * 获取最后一次被拒绝的任务 + * + * @return 最后一次被拒绝的任务 + */ + Runnable getLastRejectedTask(); +} diff --git a/src/main/java/com/serliunx/statemanagement/support/DefaultCountableRejectedExecutionHandler.java b/src/main/java/com/serliunx/statemanagement/support/DefaultCountableRejectedExecutionHandler.java new file mode 100644 index 0000000..f23748d --- /dev/null +++ b/src/main/java/com/serliunx/statemanagement/support/DefaultCountableRejectedExecutionHandler.java @@ -0,0 +1,40 @@ +package com.serliunx.statemanagement.support; + +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicLong; + +/** + * 附带计数的拒绝策略默认实现(丢弃任务并计数) + * + * @author SerLiunx + * @version 1.0.0 + * @since 2025/3/5 + */ +public class DefaultCountableRejectedExecutionHandler implements CountableRejectedExecutionHandler { + + /** + * 计数器 + */ + private final AtomicLong counter = new AtomicLong(0); + + /** + * 最后一次被拒绝的任务 + */ + private volatile Runnable last = null; + + @Override + public long getCount() { + return counter.get(); + } + + @Override + public Runnable getLastRejectedTask() { + return last; + } + + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + last = r; + counter.incrementAndGet(); + } +} diff --git a/src/main/java/com/serliunx/statemanagement/support/ExecutorUtils.java b/src/main/java/com/serliunx/statemanagement/support/ExecutorUtils.java index ad2e616..e3ff2df 100644 --- a/src/main/java/com/serliunx/statemanagement/support/ExecutorUtils.java +++ b/src/main/java/com/serliunx/statemanagement/support/ExecutorUtils.java @@ -1,9 +1,6 @@ package com.serliunx.statemanagement.support; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * 线程池相关工具类 @@ -22,10 +19,11 @@ public final class ExecutorUtils { * * @return 执行器(线程池) */ - public static Executor adaptiveThreadPool() { + public static Executor adaptiveThreadPool(RejectedExecutionHandler rejectedExecutionHandler) { final int processors = Runtime.getRuntime().availableProcessors(); return new ThreadPoolExecutor(processors * 2, processors * 4, 5, - TimeUnit.MINUTES, new ArrayBlockingQueue<>(processors * 8), new NamedThreadFactory("state-process-%s")); + TimeUnit.MINUTES, new ArrayBlockingQueue<>(processors * 8), + new NamedThreadFactory("state-process-%s"), rejectedExecutionHandler); } } diff --git a/src/test/java/com/serliunx/statemanagement/MachineTest.java b/src/test/java/com/serliunx/statemanagement/MachineTest.java index 170bb47..4b40116 100644 --- a/src/test/java/com/serliunx/statemanagement/MachineTest.java +++ b/src/test/java/com/serliunx/statemanagement/MachineTest.java @@ -42,8 +42,6 @@ public class MachineTest { .build(); stateMachine.publish(PrinterEvent.TURN_ON); - - stateMachine.close(); } @Test