From af9c1c96227bf0a612ed77653c350196b7075cca Mon Sep 17 00:00:00 2001 From: SerLiunx-ctrl <17689543@qq.com> Date: Wed, 5 Mar 2025 14:50:36 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=96=B0=E5=A2=9E=E5=8F=AF=E8=AE=A1?= =?UTF-8?q?=E6=95=B0=E7=9A=84=E6=8B=92=E7=BB=9D=E7=AD=96=E7=95=A5.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../machine/AbstractStateMachine.java | 7 ++++ .../machine/StateMachineContext.java | 6 ++- .../CountableRejectedExecutionHandler.java | 27 +++++++++++++ ...aultCountableRejectedExecutionHandler.java | 40 +++++++++++++++++++ .../support/ExecutorUtils.java | 10 ++--- .../serliunx/statemanagement/MachineTest.java | 2 - 6 files changed, 83 insertions(+), 9 deletions(-) create mode 100644 src/main/java/com/serliunx/statemanagement/support/CountableRejectedExecutionHandler.java create mode 100644 src/main/java/com/serliunx/statemanagement/support/DefaultCountableRejectedExecutionHandler.java diff --git a/src/main/java/com/serliunx/statemanagement/machine/AbstractStateMachine.java b/src/main/java/com/serliunx/statemanagement/machine/AbstractStateMachine.java index e3acb31..2df3eec 100644 --- a/src/main/java/com/serliunx/statemanagement/machine/AbstractStateMachine.java +++ b/src/main/java/com/serliunx/statemanagement/machine/AbstractStateMachine.java @@ -9,6 +9,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; /** @@ -20,6 +21,9 @@ import java.util.function.Consumer; */ public abstract class AbstractStateMachine extends AbstractStateManager implements StateMachine { + /** + * 状态机上下文 + */ protected final StateMachineContext context; /** @@ -52,6 +56,9 @@ public abstract class AbstractStateMachine extends AbstractStateManager im if (executor instanceof ExecutorService) { ExecutorService es = (ExecutorService) executor; es.shutdown(); + if (!es.awaitTermination(10, TimeUnit.SECONDS)) { + es.shutdownNow(); + } } else if (executor instanceof AutoCloseable) { AutoCloseable ac = (AutoCloseable) executor; ac.close(); diff --git a/src/main/java/com/serliunx/statemanagement/machine/StateMachineContext.java b/src/main/java/com/serliunx/statemanagement/machine/StateMachineContext.java index e9e2420..6487bd2 100644 --- a/src/main/java/com/serliunx/statemanagement/machine/StateMachineContext.java +++ b/src/main/java/com/serliunx/statemanagement/machine/StateMachineContext.java @@ -1,11 +1,15 @@ package com.serliunx.statemanagement.machine; import com.serliunx.statemanagement.machine.handler.StateHandlerWrapper; +import com.serliunx.statemanagement.support.DefaultCountableRejectedExecutionHandler; import com.serliunx.statemanagement.support.ExecutorUtils; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; /** @@ -62,7 +66,7 @@ public final class StateMachineContext { */ private Executor executorAutoConfiguration(Executor source) { if (source == null) { - return ExecutorUtils.adaptiveThreadPool(); + return ExecutorUtils.adaptiveThreadPool(new DefaultCountableRejectedExecutionHandler()); } return source; } diff --git a/src/main/java/com/serliunx/statemanagement/support/CountableRejectedExecutionHandler.java b/src/main/java/com/serliunx/statemanagement/support/CountableRejectedExecutionHandler.java new file mode 100644 index 0000000..0854a60 --- /dev/null +++ b/src/main/java/com/serliunx/statemanagement/support/CountableRejectedExecutionHandler.java @@ -0,0 +1,27 @@ +package com.serliunx.statemanagement.support; + +import java.util.concurrent.RejectedExecutionHandler; + +/** + * 附带计数的拒绝策略 + * + * @author SerLiunx + * @version 1.0.0 + * @since 2025/3/5 + */ +public interface CountableRejectedExecutionHandler extends RejectedExecutionHandler { + + /** + * 获取当前拒绝的任务数量 + * + * @return 当目前为止所拒绝的任务数量 + */ + long getCount(); + + /** + * 获取最后一次被拒绝的任务 + * + * @return 最后一次被拒绝的任务 + */ + Runnable getLastRejectedTask(); +} diff --git a/src/main/java/com/serliunx/statemanagement/support/DefaultCountableRejectedExecutionHandler.java b/src/main/java/com/serliunx/statemanagement/support/DefaultCountableRejectedExecutionHandler.java new file mode 100644 index 0000000..f23748d --- /dev/null +++ b/src/main/java/com/serliunx/statemanagement/support/DefaultCountableRejectedExecutionHandler.java @@ -0,0 +1,40 @@ +package com.serliunx.statemanagement.support; + +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicLong; + +/** + * 附带计数的拒绝策略默认实现(丢弃任务并计数) + * + * @author SerLiunx + * @version 1.0.0 + * @since 2025/3/5 + */ +public class DefaultCountableRejectedExecutionHandler implements CountableRejectedExecutionHandler { + + /** + * 计数器 + */ + private final AtomicLong counter = new AtomicLong(0); + + /** + * 最后一次被拒绝的任务 + */ + private volatile Runnable last = null; + + @Override + public long getCount() { + return counter.get(); + } + + @Override + public Runnable getLastRejectedTask() { + return last; + } + + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + last = r; + counter.incrementAndGet(); + } +} diff --git a/src/main/java/com/serliunx/statemanagement/support/ExecutorUtils.java b/src/main/java/com/serliunx/statemanagement/support/ExecutorUtils.java index ad2e616..e3ff2df 100644 --- a/src/main/java/com/serliunx/statemanagement/support/ExecutorUtils.java +++ b/src/main/java/com/serliunx/statemanagement/support/ExecutorUtils.java @@ -1,9 +1,6 @@ package com.serliunx.statemanagement.support; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * 线程池相关工具类 @@ -22,10 +19,11 @@ public final class ExecutorUtils { * * @return 执行器(线程池) */ - public static Executor adaptiveThreadPool() { + public static Executor adaptiveThreadPool(RejectedExecutionHandler rejectedExecutionHandler) { final int processors = Runtime.getRuntime().availableProcessors(); return new ThreadPoolExecutor(processors * 2, processors * 4, 5, - TimeUnit.MINUTES, new ArrayBlockingQueue<>(processors * 8), new NamedThreadFactory("state-process-%s")); + TimeUnit.MINUTES, new ArrayBlockingQueue<>(processors * 8), + new NamedThreadFactory("state-process-%s"), rejectedExecutionHandler); } } diff --git a/src/test/java/com/serliunx/statemanagement/MachineTest.java b/src/test/java/com/serliunx/statemanagement/MachineTest.java index 170bb47..4b40116 100644 --- a/src/test/java/com/serliunx/statemanagement/MachineTest.java +++ b/src/test/java/com/serliunx/statemanagement/MachineTest.java @@ -42,8 +42,6 @@ public class MachineTest { .build(); stateMachine.publish(PrinterEvent.TURN_ON); - - stateMachine.close(); } @Test