关闭 x
IT技术网
    技 采 号
    ITJS.cn - 技术改变世界
    • 实用工具
    • 菜鸟教程
    IT采购网 中国存储网 科技号 CIO智库

    IT技术网

    IT采购网
    • 首页
    • 行业资讯
    • 系统运维
      • 操作系统
        • Windows
        • Linux
        • Mac OS
      • 数据库
        • MySQL
        • Oracle
        • SQL Server
      • 网站建设
    • 人工智能
    • 半导体芯片
    • 笔记本电脑
    • 智能手机
    • 智能汽车
    • 编程语言
    IT技术网 - ITJS.CN
    首页 » JAVA »Java多线程编程模式实战指南:Active Object模式(下)

    Java多线程编程模式实战指南:Active Object模式(下)

    2014-11-29 00:00:00 出处:希望未来
    分享

    微信扫一扫:分享

    Scan me!

    微信里点“发现”,扫一下

    二维码便可将本文分享至朋友圈。

    Active Object模式的评价与实现考量

    Active Object模式通过将方法的调用与执行分离,实现了异步编程。有利于提高并发性,从而提高系统的吞吐率。

    Active Object模式还有个好处是它可以将任务(MethodRequest)的提交(调用异步方法)和任务的执行策略(Execution Policy)分离。任务的执行策略被封装在Scheduler的实现类之内,因此它对外是不“可见”的,一旦需要变动也不会影响其它代码,降低了系统的耦合性。任务的执行策略可以反映以下一些问题:

    采用什么顺序去执行任务,如FIFO、LIFO、或者基于任务中包含的信息所定的优先级? 多少个任务可以并发执行? 多少个任务可以被排队等待执行? 如果有任务由于系统过载被拒绝,此时哪个任务该被选中作为牺牲品,应用程序该如何被通知到? 任务执行前、执行后需要执行哪些操作?

    这意味着,任务的执行顺序可以和任务的提交顺序不同,可以采用单线程也可以采用多线程去执行任务等等。

    当然,好处的背后总是隐藏着代价,Active Object模式实现异步编程也有其代价。该模式的参与者有6个之多,其实现过程也包含了不少中间的处理:MethodRequest对象的生成、MethodRequest对象的移动(进出缓冲区)、MethodRequest对象的运行调度和线程上下文切换等。这些处理都有其空间和时间的代价。因此,Active Object模式适合于分解一个比较耗时的任务(如涉及I/O操作的任务):将任务的发起和执行进行分离,以减少不必要的等待时间。

    虽然模式的参与者较多,但正如ITJS的这篇文章案例的实现代码所展示的,其中大部分的参与者我们可以利用JDK自身提供的类来实现,以节省编码时间。如表1所示。

    表 1. 使用JDK现有类实现Active Object的一些参与者

    参与者名称 可以借用的JDK类 备注
    Scheduler Java Executor Framework中的java.util.concurrent.ExecutorService接口的相关实现类,如java.util.concurrent.ThreadPoolExecutor。 ExecutorService接口所定义的submit(Callable<T> task)方法相当于图2中的enqueue方法。
    ActivationQueue java.util.concurrent.LinkedBlockingQueue 若Scheduler采用java.util.concurrent.ThreadPoolExecutor,则java.util.concurrent.LinkedBlockingQueue实例作为ThreadPoolExecutor构造器的参数。
    MethodRequest java.util.concurrent.Callable接口的匿名实现类。 Callable接口比起Runnable接口的优势在于它定义的call方法有返回值,便于将该返回值传递给Future实例。
    Future java.util.concurrent.Future ExecutorService接口所定义的submit(Callable<T> task)方法的返回值类型就是java.util.concurrent.Future。

    错误隔离

    错误隔离指一个任务的处理失败不影响其它任务的处理。每个MethodRequest实例可以看作一个任务。那么,Scheduler的实现类在执行MethodRequest时需要注意错误隔离。选用JDK中现成的类(如ThreadPoolExecutor)来实现Scheduler的一个好处就是这些类可能已经实现了错误隔离。而如果自己编写代码实现Scheduler,用单个Active Object工作线程逐一执行所有任务,则需要特别注意线程的run方法的异常处理,确保不会因为个别任务执行时遇到一些运行时异常而导致整个线程终止。如清单6的示例代码所示。

    清单 6. 自己动手实现Scheduler的错误隔离示例代码

    public class CustomScheduler implements Runnable {
    	private LinkedBlockingQueue<Runnable> activationQueue = 
    		new LinkedBlockingQueue<Runnable>();
    
    	@Override
    	public void run() {
    		dispatch();
    	}
    
    	public <T> Future<T> enqueue(Callable<T> methodRequest) {
    		final FutureTask<T> task = new FutureTask<T>(methodRequest) {
    
    			@Override
    			public void run() {
    				try {
    				   super.run();
    				//捕获所以可能抛出的对象,避免该任务运行失败而导致其所在的线程终止。	
    				} catch (Throwable t) {
    				   this.setException(t);
    				}
    			}
    
    		};
    
    		try {
    			activationQueue.put(task);
    		} catch (InterruptedException e) {
    			Thread.currentThread().interrupt();
    		}
    		return task;
    	}
    
    	public void dispatch() {
    		while (true) {
    			Runnable methodRequest;
    			try {
    				methodRequest = activationQueue.take();
    
    				//防止个别任务执行失败导致线程终止的代码在run方法中
    				methodRequest.run();
    			} catch (InterruptedException e) {
    				// 处理该异常
    			}
    
    		}
    	}
    }

    缓冲区监控

    如果ActivationQueue是有界缓冲区,则对缓冲区的当前大小进行监控无论是对于运维还是测试来说都有其意义。从测试的角度来看,监控缓冲区有助于确定缓冲区容量的建议值(合理值)。清单3所示的代码,即是通过定时任务周期性地调用ThreadPoolExecutor的getQueue方法对缓冲区的大小进行监控。当然,在监控缓冲区的时候,往往只需要大致的值,因此在监控代码中要避免不必要的锁。

    缓冲区饱和处理策略

    当任务的提交速率大于任务的执行数率时,缓冲区可能逐渐积压到满。这时新提交的任务会被拒绝。无论是自己编写代码还是利用JDK现有类来实现Scheduler,对于缓冲区满时新任务提交失败,我们需要一个处理策略用于决定此时哪个任务会成为“牺牲品”。若使用ThreadPoolExecutor来实现Scheduler有个好处是它已经提供了几个缓冲区饱和处理策略的实现代码,应用代码可以直接调用。如清单3的代码所示,ITJS的这篇文章案例中我们选择了抛弃最老的任务作为处理策略。java.util.concurrent.RejectedExecutionHandler接口是ThreadPoolExecutor对缓冲区饱和处理策略的抽象,JDK中提供的具体实现如表2所示。

    表 2. JDK提供的缓冲区饱和处理策略实现类

    实现类 所实现的处理策略
    ThreadPoolExecutor.AbortPolicy 直接抛出异常。
    ThreadPoolExecutor.DiscardPolicy 放弃当前被拒绝的任务(而不抛出任何异常)。
    ThreadPoolExecutor.DiscardOldestPolicy 将缓冲区中最老的任务放弃,然后重新尝试接纳被拒绝的任务。
    ThreadPoolExecutor.CallerRunsPolicy 在任务的提交方线程中运行被拒绝的任务。

    当然,对于ThreadPoolExecutor而言,其工作队列满不一定就意味着新提交的任务会被拒绝。当其最大线程池大小大于其核心线程池大小时,工作队列满的情况下,新提交的任务会用所有核心线程之外的新增线程来执行,直到工作线程数达到最大线程数时,新提交的任务会被拒绝。

    Scheduler空闲工作线程清理

    如果Scheduler采用多个工作线程(如采用ThreadPoolExecutor这样的线程池)来执行任务。则可能需要清理空闲的线程以节约资源。清单3的代码就是直接使用了ThreadPoolExecutor的现有功能,在初始化其实例时通过指定其构造器的第3、4个参数( long keepAliveTime, TimeUnit unit),告诉ThreadPoolExecutor对于核心工作线程以外的线程若其已经空闲了指定时间,则将其清理掉。

    可复用的Active Object模式实现

    尽管利用JDK中的现成类可以极大地简化Active Object模式的实现。但如果需要频繁地在不同场景下使用Active Object模式,则需要一套更利于复用的代码,以节约编码的时间和使代码更加易于理解。清单7展示一段基于Java动态代理的可复用的Active Object模式的Proxy参与者的实现代码。

    清单 7. 可复用的Active Object模式Proxy参与者实现

    import java.lang.reflect.InvocationHandler;
    import java.lang.reflect.InvocationTargetException;
    import java.lang.reflect.Method;
    import java.lang.reflect.Proxy;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Future;
    public abstract class ActiveObjectProxy {
    
    	private static class DispatchInvocationHandler implements InvocationHandler {
    		private final Object delegate;
    		private final ExecutorService scheduler;
    
    		public DispatchInvocationHandler(Object delegate,
    		    ExecutorService executorService) {
    			this.delegate = delegate;
    			this.scheduler = executorService;
    		}
    
    		private String makeDelegateMethodName(final Method method,
    		    final Object[] arg) {
    			String name = method.getName();
    			name = "do" + Character.toUpperCase(name.charAt(0)) 
    					+ name.substring(1);
    
    			return name;
    		}
    
    		@Override
    		public Object invoke(final Object proxy, final Method method,
    		    final Object[] args) throws Throwable {
    
    			Object returnValue = null;
    			final Object delegate = this.delegate;
    			final Method delegateMethod;
    
    			//如果拦截到的被调用方法是异步方法,则将其转发到相应的doXXX方法
    			if (Future.class.isAssignableFrom(method.getReturnType())) {
    				delegateMethod = delegate.getClass().getMethod(
    					makeDelegateMethodName(method, args),
    					method.getParameterTypes());
    
    				final ExecutorService scheduler = this.scheduler;
    
    				Callable<Object> methodRequest = new Callable<Object>() {
    					@Override
    					public Object call() throws Exception {
    						Object rv = null;
    
    						try {
                              rv = delegateMethod.invoke(delegate, args);
    						} catch (IllegalArgumentException e) {
    							throw new Exception(e);
    						} catch (IllegalAccessException e) {
    							throw new Exception(e);
    						} catch (InvocationTargetException e) {
    							throw new Exception(e);
    						}
    						return rv;
    					}
    				};
    				Future<Object> future = scheduler.submit(methodRequest);
    				returnValue = future;
    
    			} else {
    
    				//若拦截到的方法调用不是异步方法,则直接转发
    			delegateMethod = delegate.getClass()
    			.getMethod(method.getName(),method.getParameterTypes());
    
    				returnValue = delegateMethod.invoke(delegate, args);
    			}
    
    			return returnValue;
    		}
    	}
    
    	/**
    	 * 生成一个实现指定接口的Active Object proxy实例。
    	 * 对interf所定义的异步方法的调用会被装发到servant的相应doXXX方法。
    	 * @param interf 要实现的Active Object接口
    	 * @param servant Active Object的Servant参与者实例
    	 * @param scheduler Active Object的Scheduler参与者实例
    	 * @return Active Object的Proxy参与者实例
    	 */
    	public static <T> T newInstance(Class<T> interf, Object servant,
    	    ExecutorService scheduler) {
    
    		@SuppressWarnings("unchecked")
    		T f = (T) Proxy.newProxyInstance(interf.getClassLoader(),
    		new Class[] { interf }, 
    		new DispatchInvocationHandler(servant, scheduler));
    
    		return f;
    	}
    }

    清单7的代码实现了可复用的Active Object模式的Proxy参与者ActiveObjectProxy。ActiveObjectProxy通过使用Java动态代理,动态生成指定接口的代理对象。对该代理对象的异步方法(即返回值类型为java.util.concurrent.Future的方法)的调用会被ActiveObjectProxy实现InvocationHandler(DispatchInvocationHandler)所拦截,并转发给ActiveObjectProxy的newInstance方法中指定的Servant处理。

    清单8所示的代码展示了通过使用ActiveObjectProxy快速Active Object模式。

    清单 8. 基于可复用的API快速实现Active Object模式

    public static void main(String[] args) throws 
    	InterruptedException, ExecutionException {
    
    	SampleActiveObject sao = ActiveObjectProxy.newInstance(
    		    SampleActiveObject.class, new SampleActiveObjectImpl(),
    		    Executors.newCachedThreadPool());
    	Future<String> ft = sao.process("Something", 1);
    	Thread.sleep(500);
    	System.out.println(ft.get());

    从清单8的代码可见,利用可复用的Active Object模式Proxy实现,应用开发人员只要指定Active Object模式对外保留的接口(对应ActiveObjectProxy.newInstance方法的第1个参数),并提供一个该接口的实现类(对应ActiveObjectProxy.newInstance方法的第2个参数),再指定一个java.util.concurrent.ExecutorService实例(对应ActiveObjectProxy.newInstance方法的第3个参数)即可以实现Active Object模式。

    总结

    ITJS的这篇文章介绍了Active Object模式的意图及架构。并提供了一个实际的案例用于展示使用Java代码实现Active Object模式,在此基础上对该模式进行了评价并分享了在实际运用该模式时需要注意的事项。

    参考资源

    ITJS的这篇文章的源代码在线阅读:https://github.com/Viscent/JavaConcurrencyPattern/ 维基百科Active Object模式词条:http://en.wikipedia.org/wiki/Active_object Douglas C. Schmidt对Active Object模式的定义:http://www.laputan.org/pub/sag/act-obj.pdf。 Schmidt, Douglas et al. Pattern-Oriented Software Architecture Volume 2: Patterns for Concurrent and Networked Objects. Volume 2. Wiley, 2000 Java theory and practice: Decorating with dynamic proxies:http://www.ibm.com/developerworks/java/library/j-jtp08305/index.html
    上一篇返回首页 下一篇

    声明: 此文观点不代表本站立场;转载务必保留本文链接;版权疑问请联系我们。

    别人在看

    抖音安全与信任开放日:揭秘推荐算法,告别单一标签依赖

    ultraedit编辑器打开文件时,总是提示是否转换为DOS格式,如何关闭?

    Cornell大神Kleinberg的经典教材《算法设计》是最好入门的算法教材

    从 Microsoft 下载中心安装 Windows 7 SP1 和 Windows Server 2008 R2 SP1 之前要执行的步骤

    Llama 2基于UCloud UK8S的创新应用

    火山引擎DataTester:如何使用A/B测试优化全域营销效果

    腾讯云、移动云继阿里云降价后宣布大幅度降价

    字节跳动数据平台论文被ICDE2023国际顶会收录,将通过火山引擎开放相关成果

    这个话题被围观超10000次,火山引擎VeDI如此解答

    误删库怎么办?火山引擎DataLeap“3招”守护数据安全

    IT头条

    平替CUDA!摩尔线程发布MUSA 4性能分析工具

    00:43

    三起案件揭开侵犯个人信息犯罪的黑灰产业链

    13:59

    百度三年开放2.1万实习岗,全力培育AI领域未来领袖

    00:36

    工信部:一季度,电信业务总量同比增长7.7%,业务收入累计完成4469亿元

    23:42

    Gartner:2024年全球半导体营收6559亿美元,AI助力英伟达首登榜首

    18:04

    技术热点

    iOS 8 中如何集成 Touch ID 功能

    windows7系统中鼠标滑轮键(中键)的快捷应用

    MySQL数据库的23个特别注意的安全事项

    Kruskal 最小生成树算法

    Ubuntu 14.10上安装新的字体图文教程

    Ubuntu14更新后无法进入系统卡在光标界面解怎么办?

      友情链接:
    • IT采购网
    • 科技号
    • 中国存储网
    • 存储网
    • 半导体联盟
    • 医疗软件网
    • 软件中国
    • ITbrand
    • 采购中国
    • CIO智库
    • 考研题库
    • 法务网
    • AI工具网
    • 电子芯片网
    • 安全库
    • 隐私保护
    • 版权申明
    • 联系我们
    IT技术网 版权所有 © 2020-2025,京ICP备14047533号-20,Power by OK设计网

    在上方输入关键词后,回车键 开始搜索。Esc键 取消该搜索窗口。