feat: 新增可计数的拒绝策略.

This commit is contained in:
2025-03-05 14:50:36 +08:00
parent f7e4400e37
commit af9c1c9622
6 changed files with 83 additions and 9 deletions

View File

@@ -9,6 +9,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer; import java.util.function.Consumer;
/** /**
@@ -20,6 +21,9 @@ import java.util.function.Consumer;
*/ */
public abstract class AbstractStateMachine<S> extends AbstractStateManager<S> implements StateMachine<S> { public abstract class AbstractStateMachine<S> extends AbstractStateManager<S> implements StateMachine<S> {
/**
* 状态机上下文
*/
protected final StateMachineContext<S> context; protected final StateMachineContext<S> context;
/** /**
@@ -52,6 +56,9 @@ public abstract class AbstractStateMachine<S> extends AbstractStateManager<S> im
if (executor instanceof ExecutorService) { if (executor instanceof ExecutorService) {
ExecutorService es = (ExecutorService) executor; ExecutorService es = (ExecutorService) executor;
es.shutdown(); es.shutdown();
if (!es.awaitTermination(10, TimeUnit.SECONDS)) {
es.shutdownNow();
}
} else if (executor instanceof AutoCloseable) { } else if (executor instanceof AutoCloseable) {
AutoCloseable ac = (AutoCloseable) executor; AutoCloseable ac = (AutoCloseable) executor;
ac.close(); ac.close();

View File

@@ -1,11 +1,15 @@
package com.serliunx.statemanagement.machine; package com.serliunx.statemanagement.machine;
import com.serliunx.statemanagement.machine.handler.StateHandlerWrapper; import com.serliunx.statemanagement.machine.handler.StateHandlerWrapper;
import com.serliunx.statemanagement.support.DefaultCountableRejectedExecutionHandler;
import com.serliunx.statemanagement.support.ExecutorUtils; import com.serliunx.statemanagement.support.ExecutorUtils;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer; import java.util.function.Consumer;
/** /**
@@ -62,7 +66,7 @@ public final class StateMachineContext<S> {
*/ */
private Executor executorAutoConfiguration(Executor source) { private Executor executorAutoConfiguration(Executor source) {
if (source == null) { if (source == null) {
return ExecutorUtils.adaptiveThreadPool(); return ExecutorUtils.adaptiveThreadPool(new DefaultCountableRejectedExecutionHandler());
} }
return source; return source;
} }

View File

@@ -0,0 +1,27 @@
package com.serliunx.statemanagement.support;
import java.util.concurrent.RejectedExecutionHandler;
/**
* 附带计数的拒绝策略
*
* @author <a href="mailto:serliunx@yeah.net">SerLiunx</a>
* @version 1.0.0
* @since 2025/3/5
*/
public interface CountableRejectedExecutionHandler extends RejectedExecutionHandler {
/**
* 获取当前拒绝的任务数量
*
* @return 当目前为止所拒绝的任务数量
*/
long getCount();
/**
* 获取最后一次被拒绝的任务
*
* @return 最后一次被拒绝的任务
*/
Runnable getLastRejectedTask();
}

View File

@@ -0,0 +1,40 @@
package com.serliunx.statemanagement.support;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
/**
* 附带计数的拒绝策略默认实现(丢弃任务并计数)
*
* @author <a href="mailto:serliunx@yeah.net">SerLiunx</a>
* @version 1.0.0
* @since 2025/3/5
*/
public class DefaultCountableRejectedExecutionHandler implements CountableRejectedExecutionHandler {
/**
* 计数器
*/
private final AtomicLong counter = new AtomicLong(0);
/**
* 最后一次被拒绝的任务
*/
private volatile Runnable last = null;
@Override
public long getCount() {
return counter.get();
}
@Override
public Runnable getLastRejectedTask() {
return last;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
last = r;
counter.incrementAndGet();
}
}

View File

@@ -1,9 +1,6 @@
package com.serliunx.statemanagement.support; package com.serliunx.statemanagement.support;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.*;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/** /**
* 线程池相关工具类 * 线程池相关工具类
@@ -22,10 +19,11 @@ public final class ExecutorUtils {
* *
* @return 执行器(线程池) * @return 执行器(线程池)
*/ */
public static Executor adaptiveThreadPool() { public static Executor adaptiveThreadPool(RejectedExecutionHandler rejectedExecutionHandler) {
final int processors = Runtime.getRuntime().availableProcessors(); final int processors = Runtime.getRuntime().availableProcessors();
return new ThreadPoolExecutor(processors * 2, processors * 4, 5, return new ThreadPoolExecutor(processors * 2, processors * 4, 5,
TimeUnit.MINUTES, new ArrayBlockingQueue<>(processors * 8), new NamedThreadFactory("state-process-%s")); TimeUnit.MINUTES, new ArrayBlockingQueue<>(processors * 8),
new NamedThreadFactory("state-process-%s"), rejectedExecutionHandler);
} }
} }

View File

@@ -42,8 +42,6 @@ public class MachineTest {
.build(); .build();
stateMachine.publish(PrinterEvent.TURN_ON); stateMachine.publish(PrinterEvent.TURN_ON);
stateMachine.close();
} }
@Test @Test