javascript访问数据库
一、概述在上一篇《原理篇》(点击文字可查看)中,我们看到了异步非阻塞模型,它能够有效降低线程io状态的耗时,提升资源利用率和系统吞吐量。异步api可以表现为listener或promise形式——其中promiseapi提供了更强的灵活性,支持同步返回和异步回调,也允许注册任意数目的回调。在本文《应用篇》中,我们将进一步探索异步模式和promise的应用:第二章:promise与线程池。在异步执行耗时请求时,executorservice+future是一个备选方案;但是相比于future,promise支持纯异步获取响应数据,能够消除更多阻塞。第三章:异常处理。java程序并不总能成功执行请求,有时会遇到网络问题等不可抗力。对于无法避免的异常情况,异步api必须提供异常处理机制,以提升程序的容错性。第四章:请求调度。java程序有时需要提交多条请求,这些请求之间可能存在一定的关联关系,包括顺序执行、并行执行、批量执行。异步api需要对这些约束提供支持。本文不限定promise的具体实现,读者在生产环境可以选择一个promise工具类(如nettydefaultpromise[a]、jdkcompletablefuture[b]等);此外,由于promise的原理并不复杂,读者也可以自行实现所需功能。二、promise与线程池java程序有时需要执行耗时的io操作,如数据库访问。在此期间,相比于纯内存计算,io操作的持续时间明显更长。为了减少io阻塞、提高资源利用率,我们应该使用异步模型,将请求提交到其他线程中执行,从而连续提交多条请求,而不必等待之前的请求返回。本章对几种io模型进行对比(见2.1节),考察调用者线程的阻塞情况。其中,promise支持纯异步的请求提交及响应数据处理,能够最大程度地消除不必要的阻塞。在实际项目中,如果底层api不支持纯异步,那么我们也可以进行适当重构,使其和promise兼容(见2.2节)。2.1对比:同步、future、promise本节对几种io模型进行对比,包括同步io、基于线程池(executorservice)的异步io、基于promise的异步io,考察调用者线程的阻塞情况。假设我们要执行数据库访问请求。由于需要跨越网络,单条请求需要进行耗时的io操作,才能最终收到响应数据;但是请求之间没有约束,允许随时提交新的请求,而不需要收到之前的响应数据。首先我们来看看几种模型的样例代码:1.同步io。db.writesync()方法是同步阻塞的。函数阻塞,直至收到响应数据。因此,调用者一次只能提交一个请求,必须等待该请求返回,才能再提交下一个请求。/*提交请求并阻塞,直至收到响应数据*/stringresult=db.writesync("data");process(result);2.基于线程池(executorservice)的异步io。db.writesync()方法不变;但是将其提交到线程池中来执行,使得调用者线程不会阻塞,从而可以连续提交多条请求data1-3。提交请求后,线程池返回future对象,调用者调用future.get()以获取响应数据。future.get()方法是阻塞的,因此调用者在获得响应数据之前无法再提交后续请求。/*提交请求*///executor:executorservicefutureresultfuture1=executor.submit(()->db.writesync("data1"));futureresultfuture2=executor.submit(()->db.writesync("data2"));futureresultfuture3=executor.submit(()->db.writesync("data3"));/*获取响应:同步*/stringresult1=resultfuture1.get();stringresult2=resultfuture2.get();stringresult3=resultfuture3.get();process(result1);process(result2);process(result3);3.基于promise的异步io。db.writeasync()方法是纯异步的,提交请求后返回promise对象;调用者调用promise.await()注册回调,当收到响应数据后触发回调。在《原理篇》中,我们看到了promiseapi可以基于线程池或响应式模型实现;不论哪种方式,回调函数可以在接收响应的线程中执行,而不需要调用者线程阻塞地等待响应数据。/*提交请求*/promiseresultpromise1=db.writeasync("data1");promiseresultpromise2=db.writeasync("data2");promiseresultpromise3=db.writeasync("data3");/*获取响应:异步*/resultpromise1.await(result1->process(result1));resultpromise2.await(result2->process(result2));resultpromise3.await(result3->process(result3));接下来我们看看以上几种模型中,调用者线程状态随时间变化的过程,如图2-1所示。a.同步io。调用者一次只能提交一个请求,在收到响应之前不能提交下一个请求。b.基于线程池的异步io。同一组请求(请求1-3,以及请求4-6)可以连续提交,而不需要等待前一条请求返回。然而,一旦调用者使用future.get()获取响应数据(result1-3),就会阻塞而无法再提交下一组请求(请求4-6),直至实际收到响应数据。c.基于promise的异步io。调用者随时可以提交请求,并向promise注册对响应数据的回调函数;稍后接收线程向promise通知响应数据,以触发回调函数。上述过程中,调用者线程不需要等待响应数据,始终不会阻塞。图2-1a线程时间线:同步io图2-1b线程时间线:基于线程池的异步io图2-1c线程时间线:基于promise的异步io综上,future只能消除提交请求的阻塞;而promise不仅能消除提交请求的阻塞,也能消除处理响应数据的阻塞,是真正的“纯异步”模式。2.2promise结合线程池和executorservice+future相比,promise具有纯异步的优点。然而在某些场景下也需要把promise和线程池结合使用。例如:底层api只支持同步阻塞模型,不支持纯异步;此时只能在线程池中调用api,才能做到非阻塞。需要重构一段遗留代码,将其线程模型从线程池模型改为响应式模型;可以先将对外接口改为promiseapi,而底层实现暂时使用线程池。下面的代码片段展示了promise和线程池结合的用法:1.创建promise对象作为返回值。注意这里使用了promiseorexception,以防期间遇到异常;其可以通知响应数据,也可以在失败时通知抛出的exception。详见3.2小节。2.在线程池中执行请求(2a),并在收到响应数据后向promise通知(2b)3.处理线程池满异常。线程池底层关联一个blockingqueue来存储待执行的任务,一般设置为有界队列以防无限占用内存,当队列满时会丢弃某个任务。为了向调用者通知该异常,线程池的拒绝策略须设置为abortpolicy,当队列满时丢弃所提交的任务,并抛出rejectedexecutionexception;一旦捕获该异常,就要向promise通知请求失败。publicpromiseorexceptionwriteasync(){//1.创建promise对象promiseorexceptionresultpromise=newpromiseorexception{stringresult=db.writesync("data");//2a.执行请求。只支持同步阻塞resultpromise.signalallwithresult(result);//2b.通知promise});}catch(rejectedexecutionexceptione){//3.异常:线程池满resultpromise.signalallwithexception(e);}returnresultpromise;}三、异常处理:promiseorexceptionjava程序有时会遇到不可避免的异常情况,如网络连接断开。因此,开发者需要设计适当的异常处理机制,以提升程序的容错性。本章介绍异步api的异常处理,首先介绍java语言异常处理规范;然后介绍promise的变体promiseorexception,使得promiseapi支持规范的异常处理。3.1异常处理规范个人认为,java代码的异常处理应当符合下列规范:1.显式区分正常出口和异常出口。2.支持编译时刻检查,强制调用者处理不可避免的异常。区分正常出口和异常出口异常是java语言的重要特性,是一种基本的控制流。java语言中,一个函数允许有一个返回值,以及抛出多个不同类型的异常。函数的返回值是正常出口,函数返回说明函数能够正常工作,并计算出正确的结果;相反,一旦函数遇到异常情况无法继续工作,如网络连接断开、请求非法等,就要抛出相应的异常。虽然if-else和异常都是控制流,但是程序员必须辨析二者的使用场景。if-else的各个分支一般是对等的,都用于处理正常情况;而函数的返回值和异常是不对等的,抛出异常表示函数遇到无法处理的故障,已经无法正常计算结果,其与函数正常工作所产生的返回值有本质区别。在api设计中,混淆正常出口(返回值)与异常出口(抛出异常),或者在无法继续工作时不抛异常,都是严重的设计缺陷。以数据库访问为例,下面的代码对比了api进行异常处理的两种形式。数据库访问过程中,如果网络连接顺畅,并且服务端能够正确处理请求,那么db.write()应该返回服务端的响应数据,如服务端为所写数据生成的自增id、条件更新实际影响的数据条数等;如果网络连接断开,或者客户端和服务端版本不匹配导致请求无法解析,从而无法正常工作,那么db.write()应该抛出异常以说明具体原因。从“是否正常工作”的角度看,上述两种情况的性质是截然不同的,显然应该选用异常作为控制流,而不是if-else。/*正确*/try{stringresult=db.write("data");process(result);//正常出口}catch(exceptione){log.error("writefails",e);//异常出口}/*错误*/stringresultorerror=db.write("data");if(resultorerror.equals("ok")){process(resultorerror);//正常出口}else{log.error("writefails,error:"+resultorerror);//异常出口}强制处理不可避免的异常java语言的异常处理体系中,异常主要分为以下几类:exception、runtimeexception、error;三者都是throwable的子类,即可以被函数抛出。注意,由于runtimeexception是exception的子类,本文为避免混淆,“exception”特指“是exception但不是runtimeexception”的那些异常。个人认为,几种异常类型分别用于下列场景:1.exception:程序外部的不可抗力造成的异常情况,如网络连接断开。即使java代码完美无瑕,也绝对不可能避免这类异常(拔掉网线试试!)。既然无法避免,这种异常就应当强制处理,以提升系统的容错能力。2.runtimeexception:编程错误造成的异常情况,如数组下标越界arrayoutofboundexception、参数不符合取值范围illegalargumentexception等。如果程序员对api的输入约束了如指掌,并在调用api之前对函数参数进行适当校验,那么runtimeexception是可以绝对避免的(除非被调api在应当抛exception处,实际抛出了runtimeexception)。既然可以避免,这种异常就没有必要强制处理。当然,人无完人。假设程序员真的违背了某些约束,函数抛出runtimeexception且未被处理,那么作为惩罚,线程或进程会退出,从而提醒程序员改正错误代码。如果线程或进程必须常驻,就要对runtimeexception进行兜底,如下面的代码所示。这里将代码缺陷视为无法避免的异常情况,捕获异常后可以记录日志、触发告警,提醒稍后来修正缺陷。newthread(()->{while(true){try{dosomething();}catch(runtimeexceptione){//对runtimeexception进行兜底,以防线程中断log.error("erroroccurs",e);}}});3.error:jvm内部定义的异常,如outofmemoryerror。业务逻辑一般不抛出error,而是抛出某种exception或runtimeexception。上述几类异常中,只有exception是强制处理的,称为checkedexception[c]。如下所示是一个checkedexception的例子。数据库访问db.write()抛出exception,表示遇到网络断开、消息解析失败等不可抗情况。异常类型为exception而不是runtimeexception,以强制调用者添加catch子句处理上述情况;如调用者遗漏了catch子句,则编译器会报错,从而提示调用者“这里一定会遇到异常情况,必须进行处理”,以完善程序容错能力。/***抛出异常,如果:*1.网络连接断开*2.消息无法解析*3.业务逻辑相关,如服务端扣款时发现余额不足*4.……//任何无法避免的情况,都应该抛出exception!*/publicstringwrite(objectdata)throwsexception{return"";}/***处理异常*/try{stringresult=db.write("data");process(result);}catch(exceptione){//如遗漏catch子句,则编译不通过log.error("writefails,db:...,data:...",e);}3.2promiseapi的异常处理上一小节讨论了异常处理的规范:显式区分正常出口和异常出口;不可抗的异常,要在编译时刻强制处理。下面的代码展示了promiseapi要如何设计异常处理机制,以符合上述规范。1.使用promiseorexception来通知响应数据和异常。promiseorexception是promise的子类,泛型模版x为数据对象resultorexception,其含有2个字段result和e:e==null表示正常,此时字段result有效;e!=null表示异常,此时不要使用字段result。2.在“重载1”中,调用者从回调函数中获得resultorexception对象。调用resultorexception.get()获取响应数据result,或者get()方法抛出异常e。这种方式的代码结构和传统的异常处理一致,可以使用多个catch子句分别处理不同类型的异常。3.在“重载2”中,调用者从回调函数中直接获得result和e。含义同上。这种方式省去了resultorexception.get();但是如果需要处不同类型的异常,则需要用einstanceofmyexception来判断异常类型。//extendspromise>promiseorexceptionresultpromsie=db.writeasync("data");/*重载1*/resultpromsie.await(resultorexception->{try{stringresult=resultorexception.get();process(result);//正常出口}catch(exceptione){log.error("writefails",e);//异常出口}});/*重载2*/resultpromise.await((result,e)->{if(e==null){process(result);//正常出口}else{log.error("writefails",e);//异常出口}});promiseorexception符合上一小节提出的异常处理规范,具有如下优点:1.区分正常出口和异常出口。响应数据和异常分别使用result和e两个变量来传递,可以靠e==null来判断是否正常。注意result==null不能作为判断条件,因为null有可能是响应数据的合法值。2.强制处理异常。不论使用哪一种回调,不存在一种代码结构能够只获得result而不获得e,因此语法上不会遗漏e的异常处理。3.允许定义异常类型。promiseorexception的泛型模版e填为excetion不是必需的,也可以填为任意其他类型。注意,受限于java语法,泛型模版处只允许填写一种异常类型,而不像函数抛异常那样允许抛出多种异常。为应对这种限制,我们只能为api定义一个异常父类,调用者用catch子句或instanceof进行向下转型。当然,这种“定义异常父类”的做法也是可以接受的,并在现有工具中广泛应用,因为可以将工具所抛异常区别于java语言内置的异常类型。最后,在异常处理结构方面个人提出一个建议:全部异常通过promiseorexception来通知,而api本身不要抛出异常。以数据库访问apiwriteasync()为例,面的代码对比了两种抛异常的方式。正确的做法是promiseorexception作为唯一出口,如果api底层实现抛出异常(submit()throwsexception),则应该将异常封装于promiseorexception对象,而不应该直接从api函数抛出(writeasync()throwsexception)。/*正确:唯一出口promiseorexception*/publicpromiseorexceptionwriteasync(objectdata){try{submit(data);//throwsexception}catch(exceptione){returnpromiseorexception.immediatelyexception(e);}promiseorexceptionresultpromise=newpromiseorexception();returnresultpromise;}如果错误地设计了含有两个异常出口的api,调用者就不得不重复书写异常处理逻辑,如下面的代码所示。try{promiseorexceptionresultpromise=db.writeasync("data");resultpromise.await((result,e)->{if(e==null){process(result);//正常出口}else{log.error("writefails",e);//异常出口2}});}catch(exceptione){log.error("writefails",e);//异常出口1}四、请求调度java程序中有时需要提交多条异步请求,且这些请求之间存在一定的关联关系。在异步非阻塞场景下,这些关联关系都可以借助promise来实现。1.顺序请求,如图4-1所示。后一条请求依赖前一条请求的响应数据;因此,必须等待前一条请求返回,才能构造并提交下一条请求。图4-1顺序请求2.并行请求,如图4-2所示。一次提交多条请求,然后等待全部请求返回。所提交的请求之间没有依赖关系,因此可以同时执行;但是必须收到每条请求的响应数据(发生channelread()事件,事件参数为响应数据),才能执行实际的处理process(result1,2,3)。图4-2并行请求3.批量请求,如图4-3所示。调用者连续提交多条请求,但是暂存在队列中(offer()),而不是立刻执行。一段时间后,从队列中取出若干条请求,组装为批量请求来提交(writebulk());当收到批量请求的响应消息时,可以从中取出每条请求的响应数据。由于每次网络io都带来额外开销,故实际应用中经常使用批量请求来减少网络io频率,以提升总体吞吐量。图4-3批量请求4.1顺序请求:promise.then()假设一系列操作需要依次完成,即前一操作完成后,才能开始执行下一操作;如果这些操作均表现为promiseapi,我们可以对promise.await(listener)进行封装,使代码结构更加简洁。如下所示是一个异步promiseapi。submit方法提交请求request并返回promise对象;当收到响应数据时,该promise对象被通知。/***异步promiseapi*/publicstaticpromisesubmit(objectrequest){promiseresultpromise=newpromise{submit("b").await(resultb->{submit("c").await(resultc->{submit("d").await(resultd->{submit("e").await(resulte->{process(resulte);});});});});});为改进代码结构,我们对promise.await(consumer)方法进行封装,提供promise.then(function>)方法,如下所示。类似于await(),then()也可以注册一个回调函数resultx->submit("x+1"),回调函数处理响应数据resultx,并提交下一请求submit("x+1");then()的返回值即submit("x+1")的返回值,用于通知下一请求的响应数据resultx+1。promiseresultpromisea=submit("a");promiseresultpromiseb=resultpromisea.then(resulta->submit("b"));promiseresultpromisec=resultpromiseb.then(resultb->submit("c"));promiseresultpromised=resultpromisec.then(resultc->submit("d"));promiseresultpromisee=resultpromised.then(resultd->submit("e"));resultpromisee.await(resulte->process(resulte));接下来,我们将中间变量resultpromisea-e内联,即得到基于then()的链式调用结构。相比于await(),then()消除了套娃般的嵌套回调。submit("a").then(resulta->submit("b"))//返回resultpromiseb.then(resultb->submit("c"))//返回resultpromisec.then(resultc->submit("d"))//返回resultpromised.then(resultd->submit("e"))//返回resultpromisee.await(resulte->process(resulte));最后,我们来看一下promise.then()的一种简单实现,如下所示:1.then()方法提供一个泛型模版next,以说明下一请求的响应数据类型。2.根据泛型模版next,then()内部创建promise作为返回值,用于通知下一请求的响应数据。3.对于当前请求,调用await()注册响应数据的回调result;当收到响应数据后,执行函数func,以提交下一请求:func.apply(result)。4.当收到下一请求的响应数据后,promise被通知:nextpromise::signalall。publicpromisethen(function>func){promisenextpromise=newpromise{promiseresultpromisenext=func.apply(result);resultpromisenext.await(nextpromise::signalall);});returnnextpromise;}注意,这里只展示了纯异步重载promise.then(function>)。根据回调函数是否有返回值、同步执行还是异步执行,promise可以提供then()的更多种重载;受限于java语法,如编译器无法辨析各个重载,则可以使用函数名称进行显式区别,如:thenrun(runnable)thenaccept(consumer)thenapply(function)thenapplyasync(function>)4.2并行请求:latchpromise上一小节介绍了“顺序请求”的场景,即多条请求需要依次执行。而“并行请求”场景下,多条请求之间没有顺序约束,但是我们仍然需要等待全部请求返回,才能执行后续操作。例如,我们需要查询多张数据库表,这些查询语句可以同时执行;但是必须等待每条查询都返回,我们才能获得完整信息。jdk提供countdownlatch来实现这一场景,但是其只支持同步等待;作为改进,我们采用latchpromise实现相同的功能,并且支持纯异步api。以数据库访问为例,如下所示的代码展示了latchpromise的使用:1.提交3条请求,并获取每个请求所对应的promise对象resultpromise1-3,以获取响应数据。2.创建latchpromise对象,并向其注册需要等待的promise对象resultpromise1-3。3.latchpromise.untilallsignaled()返回一个promise对象allsignaled。当所注册的resultpromise1-3均被通知后,allsignaled会被通知。allsignaled的类型为voidpromise,表示allsignaled被通知时没有需要处理的响应数据。4.在allsignaled上注册回调,在回调函数中调用resultpromisex.await()获取实际的响应数据;此时由于请求已执行完毕,故await()立刻返回而不阻塞。/*创建promise对象*/promiseresultpromise1=db.writeasync("a");promiseresultpromise2=db.writeasync("b");promiseresultpromise3=db.writeasync("c");/*向latchpromise注册要等待的promise*/latchpromiselatch=newlatchpromise();latch.add(resultpromise1);latch.add(resultpromise2);latch.add(resultpromise3);/*等待全部promise被通知*/voidpromiseallsignaled=latch.untilallsignaled();allsignaled.await(()->{stringresult1=resultpromise1.await();stringresult2=resultpromise2.await();stringresult3=resultpromise3.await();process(result1,result2,result3);});作为对比,下面的代码使用countdownlatch实现相同功能,但是存在以下缺陷:1.countdownlatch.await()只支持同步等待。在纯异步场景下是无法接受的。2.countdownlatch对业务逻辑有侵入性。开发者需要在业务逻辑中添加对countdownlatch.countdown()的调用,以控制countdownlatch的时序;相反,latchpromise依赖本来就已经存在的resultpromise对象,而不需要编写额外的时序控制代码。3.countdownlatch引入了冗余逻辑。创建countdownlatch时,必须在构造参数中填写要等待的请求数;因此,一旦所提交的请求的数目改变,就必须相应地更新创建countdownlatch的代码,修改构造参数。countdownlatchlatch=newcountdownlatch(3);resultpromise1.await(result1->latch.countdown());resultpromise2.await(result2->latch.countdown());resultpromise3.await(result3->latch.countdown());latch.await();stringresult1=resultpromise1.await();stringresult2=resultpromise2.await();stringresult3=resultpromise3.await();process(result1,result2,result3);最后,我们来看一下latchpromise的参考实现。代码原理如下所示:1.设立countunfinished变量,记录还没有被通知的promise对象的数目。每当注册一个promise对象,countunfinished递增;每当一个promise被通知,countunfinished递减。当countunfinished减到0时,说明所注册全部promise对象都被通知了,故通知allsignaled。2.设立nomore变量,记录是否还需要继续注册新的promise对象,仅当调用了untilallsignaled()才认为完成注册;在此之前,即使countunfinished减至0,也不应该通知allsignaled。考虑这样一种情况:需要注册并等待resultpromise1-3,其中resultpromise1、2在注册期间即已被通知,而resultpromise3未被通知。如果不判断nomore,那么注册完resultpromise1、2后,countunfinished即已减至0,导致提前通知allsignaled;这是一个时序错误,因为实际上resultpromise3还没有完成。3.为保证线程安全,访问变量时须上锁,此处使用synchronized来实现。4.注意,调用untilallsignaled()时,如果countunfinished的初值已经为0,则应立刻通知allsignaled;因为countunfinished已经不可能再递减,之后没有机会再通知allsignaled了。//privatestaticclasslock。无成员,仅用于synchronized(lock)privatefinallocklock=newlock();privateintcountunfinished=0;privatefinalvoidpromiseallsignaled=newvoidpromise();publicvoidadd(promisepromise){if(promise.issignaled()){return;}synchronized(lock){countunfinished++;}promise.await(unused->{synchronized(lock){countunfinished--;if(countunfinished==0&&nomore){allsignaled.signalall();}}});}publicvoidpromiseuntilallsignaled(){synchronized(lock){if(countunfinished==0){allsignaled.signalall();}else{nomore=true;}}returnallsignaled;}4.3批量请求:executorasync批量请求的特性“批量请求”(也称“bulk”、“batch”)是指发送一条消息即可携带多条请求,主要用于数据库访问和远程调用等场景。由于减少了网络io次数、节约了构造和传输消息的开销,批量请求能有效提升吞吐量。很多数据库api都支持批量读写,如jdbcpreparedstatement[e]、elasticsearchbulkapi[f]、mongodbinsertmany()[g]、influxdbbatchpoints[h],读者可以查阅参考文献进一步了解。为了提升性能,部分api会牺牲易用性。其中,elasticsearchbulkapi对调用者的限制最少,允许混杂增删改等不同类型的请求,允许写入不同的数据库表(index);mongodb、influxdb次之,一个批量请求只能写入同一个数据库表,但是可以自定义每条数据的字段;preparedstatement的灵活性最低,其定义了sql语句的模版,调用者只能填写模版参数,而不能修改语句结构。虽然数据库api已经支持批量访问,但是很多原生api仍然需要调用者自己构造批量请求,需要调用者处理请求组装、批量大小、并发请求数等复杂的细节。在此,我们设计出通用组件executorasync,封装请求调度策略以提供更简洁的api。executorasync的使用流程如下面的代码片段所示:1.类似于线程池executorservice.submit(),调用者可以调用executorasync.submit()来提交一个请求。其中,请求以数据对象request表示,用于存储请求类型和请求参数。2.提交请求后,调用者获得promise对象,以获取响应数据。由于使用了promise,executorasync支持纯异步操作,提交请求和获取响应数据都不需要阻塞。3.executorasync内部对请求进行调度,并非提交一条请求就立刻执行,而是每隔固定时间收集一批请求,将其组装为一个批量请求,再调用实际的数据库访问api。如果数据库访问api允许,那么一批请求可以混杂不同的请求类型,或者操作不同的数据库表。executorasyncexecutor=newexecutorasync();promiseresultpromise1=executor.submit(newrequest("data1"));promiseresultpromise2=executor.submit(newrequest("data2"));promiseresultpromise3=executor.submit(newrequest("data3"));具体而言,executorasync支持如下调度策略:1.排队,如图4-4a所示。调用者提交请求request后不要立刻执行,而是将其缓存在队列queue中。图4-4aexecutorasync特性:排队2.批量,如图4-4b所示。每隔固定时间间隔,executorasync从队列中取出若干条请求,将其组装为批量请求bulk,并调用底层数据库api提交给服务端。如果队列长度增长得很快,我们也可以定义一个批量大小bulksize,当队列长度到达该值时立刻组装一个批量请求并提交。图4-4bexecutorasync特性:批量3.并发,如图4-4c所示。如果底层数据库api支持异步提交请求,那么executorasync就可以充分利用这种特性,连续提交多个批量请求,而不需要等待之前的批量请求返回。为避免数据库服务器超载,我们可以定义并发度parallelism,限制正在执行(inflight)的批量请求的数目;当达到限制时,如果调用者再提交新的请求,就暂存在队列queue中等待执行,而不会组装新的批量请求。图4-4cexecutorasync特性:并发4.丢弃,如图4-4d所示。在上文提到的bulksize和parallelism的限制下,如果提交请求的速率远高于服务端响应的速率,那么大量请求就会堆积在队列中等待处理,最终导致超时失败。在这种情况下,将请求发送给服务端已经没有意义,因为调用者已经认定请求失败,而不再关心响应数据。图4-4d请求超时因此,executorasync应该及时从队列中移除无效请求,而剩余请求仍然“新鲜”。这种策略能够强制缩短队列长度,以降低后续请求在队列中的堆积时长、预防请求超时;同时,由于避免存储和发送无效请求,这种策略也能节约内存和io开销。图4-4eexecutorasync特性:丢弃批量请求的实现上一小节我们看到了executorasync的调度策略,包括排队、批量、并发、丢弃。如下面的代码所示,executorasync只需对外提供submit(request)方法,用于提交单条请求。请求以数据对象request表示,其字段request.resultpromise是promise对象,用于通知响应数据;在需要进行异常处理的场景下,我们使用promiseorexception作为promise的实现,其中泛型模版t改为响应数据的实际类型。publicclassexecutorasync{publicpromiseorexceptionsubmit(requestrequest){returnrequest.resultpromise;}}接下来我们来看看executorasync的实现原理。由于源码细节较多、篇幅较长,故本节用流程图的形式,来讲解更高层的设计,如图4-5所示。图4-5executorasync原理1.提交请求。调用者调用executorasync.submit(request),每次调用提交一条请求。该条请求存入队列queue,等待后续调度执行。参数request的结构如下面的代码所示,包括下列字段:predicate:函数,判断请求是否有效,无效请求(如超时的请求)将被丢弃。详见步骤2。resultpromise:通知响应数据。publicclassrequest{publicfinalpredicateepredicate;publicfinalpromiseorexceptionresultpromise;}2.每隔固定间隔,或者queue.size()达到bulksize,尝试组装批量请求。从队列queue中依次取出请求,每条请求执行函数request.predicate,以判断是否仍然要提交该请求;取出的有效请求的条数,不超过bulksize。predicate是一个函数,类似于jdkpredicate接口,形式如下面的代码所示。接口函数test()可以正常返回,表示请求仍然有效;也可以抛出异常,说明请求无效的原因,如等待超时。如果抛出异常,则该条请求直接丢弃,并将发生的异常将通知给request.resultpromise,使得调用者执行异常处理逻辑。publicinterfacepredicatee{voidtest()throwsexception;}3.提交批量请求。第2步从队列queue中取出了至多bulksize条请求,将其作为参数调用requestfunc.execute(requests),以提交批量请求。接口requestfunc的形式如下面的代码所示。接口方法execute(requests)以若干条请求为参数,将其组装为批量请求,调用底层的数据库api来提交。publicinterfacerequestfunc{voidexecute(list>requests);}4.当收到响应后,对于每条请求,依次向request.resultpromise通知响应数据。5.为防止服务端超载,executorasync可限制并发请求数不超过parallelism。我们设置计数变量inflight=0,以统计正在执行的批量请求数:a.当尝试组装批量请求(步骤2)时,首先判断inflight