1. 首页 > 快讯

Java中线程拒绝策略指南

在Java中,线程池(ThreadPoolExecutor)是用于管理和调度线程的一种机制,它通过重用已存在的线程来减少线程创建和销毁的开销,从而提高系统性能。然而,当线程池中的任务队列已满且无法再接受新的任务时,就需要采取拒绝策略来处理这种情况。Java提供了几种内置的线程拒绝策略,并且允许开发者通过实现RejectedExecutionHandler接口来自定义拒绝策略。以下是对Java中线程拒绝策略的详细指南:

一、内置的线程拒绝策略

1.AbortPolicy(中止策略)

  • 概述:这是ThreadPoolExecutor的默认拒绝策略。当任务无法被提交给线程池时,会直接抛出RejectedExecutionException异常。

  • 应用场景:适用于对任务提交失败要求敏感的场景,需要明确知道任务是否被接受并执行。

  • 实现方式:实现RejectedExecutionHandler接口,在rejectedExecution方法中抛出异常。

2.CallerRunsPolicy(调用者运行策略)

  • 概述:当任务无法被提交给线程池时,会由提交任务的线程自己执行该任务。

  • 应用场景:适用于对任务提交失败要求较低的场景,通过调用线程来执行任务,避免任务丢失。

  • 实现方式:实现RejectedExecutionHandler接口,在rejectedExecution方法中使用提交任务的线程来执行任务。

3.DiscardPolicy(丢弃策略)

  • 概述:当任务无法被提交给线程池时,直接丢弃该任务,不会有任何异常抛出。

  • 应用场景:适用于任务执行对系统不是必要的场景,例如,某些日志记录任务或统计任务。

  • 实现方式:实现RejectedExecutionHandler接口,在rejectedExecution方法中不做任何操作,即丢弃任务。

4.DiscardOldestPolicy(丢弃最旧任务策略)

  • 概述:当任务无法被提交给线程池时,会丢弃队列中最早的一个任务,然后尝试再次提交当前任务。

  • 应用场景:适用于希望优先处理新提交的任务的场景。

  • 实现方式:实现RejectedExecutionHandler接口,在rejectedExecution方法中丢弃队列中的第一个任务,并重新尝试提交当前任务。

二、自定义拒绝策略

自定义拒绝策略在Java中是通过实现RejectedExecutionHandler接口来完成的。这个接口定义了一个方法rejectedExecution(Runnable r, ThreadPoolExecutor executor),该方法在任务无法被线程池接受时会被调用。你可以在这个方法里实现自定义的逻辑,比如将任务存储到数据库、发送到另一个队列、记录日志等。

下面是一个自定义拒绝策略的详细说明以及实例:

自定义拒绝策略说明

  1. 定义类:首先,你需要定义一个类来实现RejectedExecutionHandler接口。

  2. 实现方法:在类中实现rejectedExecution方法。你可以在这个方法里编写当任务被拒绝时想要执行的逻辑。

  3. 使用自定义拒绝策略:在创建ThreadPoolExecutor时,将你的自定义拒绝策略作为参数传递给构造器。

实例

假设我们想要实现一个自定义拒绝策略,该策略会将无法执行的任务的详细信息记录到日志中。

importjava.util.concurrent.RejectedExecutionHandler; importjava.util.concurrent.ThreadPoolExecutor; importjava.util.logging.Logger; publicclassCustomRejectionHandlerimplementsRejectedExecutionHandler{ privatestaticfinalLogger logger = Logger.getLogger(CustomRejectionHandler.class.getName()); @Override publicvoidrejectedExecution(Runnable r, ThreadPoolExecutor executor){ // 记录日志,包括任务详情和线程池的状态 logger.warning("Task "+ r.toString() +" was rejected from executor " + executor.toString()
+" with queue size "+ executor.getQueue().size()); // 这里可以根据需要添加更多的处理逻辑,比如尝试将任务提交到另一个线程池等 }
} // 使用自定义拒绝策略的示例 publicclassThreadPoolCustomRejectionExample{ publicstaticvoidmain(String[] args){ // 创建一个ThreadPoolExecutor,使用自定义的拒绝策略 ThreadPoolExecutor executor =newThreadPoolExecutor( 1,1,0L, TimeUnit.MILLISECONDS, newArrayBlockingQueue<>(1), newCustomRejectionHandler()); // 提交任务 for(inti =0; i <3; i++) { finalinttaskId = i;
executor.submit(() -> {
System.out.println("Executing task "+ taskId); try{ // 模拟任务执行时间 Thread.sleep(1000);
}catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
});
} // 注意:由于队列大小为1,且核心线程数也为1,第三个任务会触发自定义拒绝策略 // 关闭线程池(注意:这里只是示例,实际使用中可能需要等待任务执行完毕) // executor.shutdown(); // try { //     if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { //         executor.shutdownNow(); //     } // } catch (InterruptedException e) { //     executor.shutdownNow(); //     Thread.currentThread().interrupt(); // } }
}

注意:在上面的示例中,我使用了Java的日志记录工具Logger来记录任务被拒绝时的信息。你需要确保你的项目中已经正确配置了日志记录器,以便能够看到日志输出。

另外,请注意,在示例中的main方法末尾,我注释掉了线程池的关闭代码,因为在实际应用中,你可能需要等待所有任务都执行完毕后再关闭线程池。这里为了简化示例,我省略了这部分代码。如果你想要运行示例并看到日志输出,请确保你的IDE或运行环境支持日志记录,并且已经正确配置了日志级别。

如果你想要在实际项目中使用自定义拒绝策略,请确保你的类路径中包含了必要的日志库(如java.util.logging、Log4j、SLF4J等),并正确配置了日志记录器。

三、选择拒绝策略的建议

在选择Java线程池的拒绝策略时,需要根据具体的应用场景和业务需求来做出决策。以下是一些详细的建议,帮助你选择适合的拒绝策略:

1. 了解JDK自带的拒绝策略

JDK自带的线程池拒绝策略主要包括以下几种:

  • AbortPolicy(默认策略):当任务被拒绝时,直接抛出RejectedExecutionException异常。这种策略适用于不允许任务丢失的场景,但需要确保能够正确处理异常,否则可能会打断当前的执行流程。

  • CallerRunsPolicy:当任务被拒绝时,由提交任务的线程(即调用者线程)来执行该任务。这种策略适用于那些允许任务延迟执行,并且不希望任务被丢弃的场景。但需要注意的是,如果提交任务的线程本身就很忙,这可能会导致性能问题。

  • DiscardPolicy:当任务被拒绝时,直接丢弃该任务,不执行也不抛出异常。这种策略适用于那些任务可以被忽略,或者任务执行与否对系统影响不大的场景。

  • DiscardOldestPolicy:当任务被拒绝时,丢弃队列中最老的任务(即最先进入队列但尚未被执行的任务),并尝试重新提交当前任务。这种策略适用于那些可以容忍丢弃旧任务,以便为新任务腾出空间的场景。

2. 自定义拒绝策略的建议

如果JDK自带的拒绝策略无法满足你的需求,你可以通过实现RejectedExecutionHandler接口来自定义拒绝策略。以下是一些自定义拒绝策略的建议:

  • 记录日志:在自定义拒绝策略中,你可以记录被拒绝任务的详细信息,以便后续分析和调试。这有助于你了解系统在高负载下的行为,并据此优化系统配置或代码逻辑。

  • 任务重试:在某些场景下,你可能希望在被拒绝后重新尝试提交任务。这可以通过在自定义拒绝策略中实现重试逻辑来完成。但需要注意的是,过多的重试可能会导致系统资源耗尽,因此需要合理设置重试次数和间隔。

  • 任务降级:当系统负载过高时,你可以考虑将部分非关键任务进行降级处理。例如,将某些实时性要求不高的任务延迟执行或降低执行优先级。这可以通过在自定义拒绝策略中根据任务的重要性进行区分处理来实现。

  • 任务缓存:如果系统能够容忍一定的延迟,并且希望在未来某个时间点再次尝试执行被拒绝的任务,你可以将这些任务缓存起来。这可以通过使用并发安全的队列(如LinkedBlockingQueue)来实现。但需要注意的是,缓存队列的大小需要合理设置,以避免内存溢出等问题。

  • 通知或告警:当任务被拒绝时,你可以通过发送通知或告警信息来提醒相关人员关注系统状态。这有助于及时发现并解决问题,确保系统的稳定性和可靠性。

3. 选择拒绝策略的注意事项

  • 根据业务需求选择:不同的业务场景对任务的处理方式有不同的要求。因此,在选择拒绝策略时,需要充分考虑业务需求,确保所选策略能够满足业务要求。

  • 考虑系统负载和性能:在选择拒绝策略时,需要关注系统负载和性能。例如,如果系统负载已经很高,再采用CallerRunsPolicy可能会导致系统性能进一步下降。

  • 异常处理:对于AbortPolicy等可能抛出异常的拒绝策略,需要确保能够正确处理异常,以避免对系统造成更大的影响。

  • 资源利用:在选择拒绝策略时,需要考虑系统资源的利用情况。例如,如果系统资源充足,可以考虑采用更加积极的拒绝策略(如重试或缓存)来尽可能多地执行任务。

综上所述,选择Java线程池的拒绝策略需要根据具体的应用场景和业务需求来做出决策。在选择过程中,需要充分了解各种拒绝策略的特点和适用场景,并结合系统负载、性能和资源利用情况等因素进行综合考虑。

四、示例代码

以下是一些使用不同拒绝策略的示例代码:

示例1:使用`AbortPolicy`(默认策略)

importjava.util.concurrent.ArrayBlockingQueue; importjava.util.concurrent.ThreadPoolExecutor; importjava.util.concurrent.TimeUnit; publicclassAbortPolicyExample{ publicstaticvoidmain(String[] args){
ThreadPoolExecutor executor =newThreadPoolExecutor( 1,1,0L, TimeUnit.MILLISECONDS, newArrayBlockingQueue<>(1), // 使用默认的AbortPolicy newThreadPoolExecutor.AbortPolicy()
); // 提交任务 for(inti =0; i <3; i++) { finalinttaskId = i;
executor.submit(() -> {
System.out.println("Executing task "+ taskId); try{
TimeUnit.SECONDS.sleep(1);// 模拟任务执行时间 }catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
});
} // 注意:这里没有关闭线程池,因为示例只是为了展示拒绝策略的效果 // 在实际应用中,你应该在适当的时候关闭线程池 }
} // 注意:由于使用了AbortPolicy,并且线程池和队列的容量都很小,第三个任务会被拒绝, // 并抛出RejectedExecutionException异常。但在这个简单的示例中,异常没有被捕获和处理。

示例2:使用`CallerRunsPolicy`

importjava.util.concurrent.ArrayBlockingQueue; importjava.util.concurrent.ThreadPoolExecutor; importjava.util.concurrent.TimeUnit; publicclassCallerRunsPolicyExample{ publicstaticvoidmain(String[] args){
ThreadPoolExecutor executor =newThreadPoolExecutor( 1,1,0L, TimeUnit.MILLISECONDS, newArrayBlockingQueue<>(1), // 使用CallerRunsPolicy newThreadPoolExecutor.CallerRunsPolicy()
); // 提交任务 for(inti =0; i <3; i++) { finalinttaskId = i;
executor.submit(() -> {
System.out.println(Thread.currentThread().getName() +" is executing task "+ taskId); try{
TimeUnit.SECONDS.sleep(1);// 模拟任务执行时间 }catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
});
} // 注意:由于使用了CallerRunsPolicy,第三个任务将由提交它的线程(即main线程)来执行 }
}

示例3:使用`DiscardPolicy`

importjava.util.concurrent.ArrayBlockingQueue; importjava.util.concurrent.ThreadPoolExecutor; importjava.util.concurrent.TimeUnit; publicclassDiscardPolicyExample{ publicstaticvoidmain(String[] args){
ThreadPoolExecutor executor =newThreadPoolExecutor( 1,1,0L, TimeUnit.MILLISECONDS, newArrayBlockingQueue<>(1), // 使用DiscardPolicy newThreadPoolExecutor.DiscardPolicy()
); // 提交任务 for(inti =0; i <3; i++) { finalinttaskId = i;
executor.submit(() -> {
System.out.println("Executing task "+ taskId); try{
TimeUnit.SECONDS.sleep(1);// 模拟任务执行时间 }catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
});
} // 注意:由于使用了DiscardPolicy,第三个任务将被丢弃,不会有任何输出或异常 }
}

本文采摘于网络,不代表本站立场,转载联系作者并注明出处:https://www.iotsj.com//kuaixun/3552.html

联系我们

在线咨询:点击这里给我发消息

微信号:666666