Hystrix请求合并与请求缓存(二):请求合并

 公司新闻     |      2019-10-16 20:34

今日继续摸鱼Hystrix的请求合并部分,可能不如请求缓存分析的详细,但是我感觉足够表达实现原理了。

本文选择了较为简单的请求合并的用例进行切入并分析,即CommandCollapserGetValueForKey,而非ObservableCollapserGetWordForNumber,原理都是一致的,只是ObservableCollapserGetWordForNumber提供了更为丰富的接口,供业务实现。

从CommandCollapserGetValueForKey例子看,只要做如下3件事,就能实现请求合并。

1、继承HystrixCollapser BatchReturnType, ResponseType, RequestArgumentType 。 2、重写三个方法,分别为getRequestArgument、createCommand、mapResponseToRequests。 3、写一个BatchCommand,即请求合并后的一个HystrixCommand。

接下来,可以从源码层面上看,如何通过这三步操作实现请求合并。

**
 * 根据设定时间参数以及合并请求数,将多个HystrixCommand合并成一次的HystrixCommand,从而将短时间调用服务的次数减少。
 * p 
 * 通常将时间窗口设为10ms左右
 * @param BatchReturnType 
 * 合并后的HystrixCommand的返回类型,例如String变成List String 。
 * @param ResponseType 
 * 需要合并的HystrixCommand的返回类型。
 * @param RequestArgumentType 
 * 需要合并的HystrixCommand的请求参数类型。
public abstract class HystrixCollapser BatchReturnType, ResponseType, RequestArgumentType implements HystrixExecutable ResponseType , HystrixObservable ResponseType {
复制代码

请求合并的过程,从例子可以看出,合并后的BatchCommand的参数为Collection CollapsedRequest String, Integer requests,即请求合并的过程就是从单个请求的参数合并成Collection CollapsedRequest ResponseType, RequestArgumentType 。

因此,可以从getRequestArgument的调用入手,就找到了HystrixCollapser.toObservable。

// 提交请求,直接返回结果了。。。
Observable ResponseType response = requestCollapser.submitRequest);
 * Submit a request to a batch. If the batch maxSize is hit trigger the batch immediately.
 * 和清楚了将时间窗口内的请求提交,如果到了设定的合并阈值,触发一次合并请求
 * @param arg argument to a {@link RequestCollapser} 
 * @return Observable ResponseType 
 * @throws IllegalStateException
 * if submitting after shutdown
 public Observable ResponseType submitRequest {
 * 启动计时器,时间窗口阈值到了,则触发一次合并请求
 if  timerListenerRegistered.compareAndSet) {
 /* schedule the collapsing task to be executed every x milliseconds  */
 timerListenerReference.set));
 // loop until succeed 
 // 等待-通知模型
 while  {
 // 拿到RequestBatch
 final RequestBatch BatchReturnType, ResponseType, RequestArgumentType b = batch.get;
 if  {
 return Observable.error);
 final Observable ResponseType response;
 // 添加到RequestBatch
 if  {
 response = b.offer;
 } else {
 response = b.offer NULL_SENTINEL);
 // it will always get an Observable unless we hit the max batch size
 // 添加成功,返回 Observable
 if  {
 return response;
 } else {
 // this batch can't accept requests so create a new one and set it if another thread doesn't beat us
 // 添加失败,执行 RequestBatch ,并创建新的 RequestBatch
 createNewBatchAndExecutePreviousIfNeeded;
复制代码
 public Observable ResponseType offer {
 2: // 执行已经开始,添加失败
 3: /* short-cut - if the batch is started we reject the offer */
 4: if ) {
 5: return null;
 6: }
 8: /*
 9: * The 'read' just means non-exclusive even though we are writing.
10: */
11: if .tryLock) {
12: try {
13: // 执行已经开始,添加失败
14: /* double-check now that we have the lock - if the batch is started we reject the offer */
15: if ) {
16: return null;
17: }
19: // 超过队列最大长度,添加失败
20: if  = maxBatchSize) {
21: return null;
22: } else {
23: // 创建 CollapsedRequestSubject ,并添加到队列
24: CollapsedRequestSubject ResponseType, RequestArgumentType collapsedRequest = new CollapsedRequestSubject ResponseType, RequestArgumentType ;
25: final CollapsedRequestSubject ResponseType, RequestArgumentType existing =  argumentMap.putIfAbsent;
26: /**
27: * If the argument already exists in the batch, then there are 2 options:
28: * A) If request caching is ON : only keep 1 argument in the batch and let all responses
29: * be hooked up to that argument
30: * B) If request caching is OFF: return an error to all duplicate argument requests
31: *
32: * This maintains the invariant that each batch has no duplicate arguments. This prevents the impossible
33: * logic 
34: * of trying to figure out which argument of a set of duplicates should get attached to a response.
35: *
36: * See https://github.com/Netflix/Hystrix/pull/1176 for further discussion.
37: */
38: if  {
39: boolean requestCachingEnabled = properties.requestCacheEnabled.get;
40: if  {
41: return existing.toObservable;
42: } else {
43: return Observable.error.name + " or prevent duplicates from making it into the batch!"));
44: }
45: } else {
46: return collapsedRequest.toObservable;
47: }
49: }
50: } finally {
51: batchLock.readLock.unlock;
52: }
53: } else {
54: return null;
55: }
56: }
复制代码

第 38 至 47 行 :返回Observable。当argumentMap已经存在arg对应的Observable时,必须开启缓存 功能。原因是,如果在相同的arg,并且未开启缓存,同时第 43 行实现的是collapsedRequest.toObservable,那么相同的arg将有多个Observable执行命令,此时HystrixCollapserBridge.mapResponseToRequests 方法无法将执行赋值到arg对应的命令请求 ,见 github.com/Netflix/Hys… 。

回过头看HystrixCollapser#toObservable方法的代码,这里也有对缓存功能,是不是重复了呢?argumentMap 针对的是RequestBatch级的缓存,HystrixCollapser : RequestCollapser : RequestBatch 是 1 : 1 : N 的关系,通过 HystrixCollapser#toObservable 对缓存的处理逻辑,保证 RequestBatch 切换后,依然有缓存。

CollapsedTask负责触发时间窗口内合并请求的处理,其实关键方法就是createNewBatchAndExecutePreviousIfNeeded,并且也调用了executeBatchIfNotAlreadyStarted。

/**
 * Executed on each Timer interval execute the current batch if it has requests in it.
 private class CollapsedTask implements TimerListener {
 @Override
 public Void call throws Exception {
 try {
 // we fetch current so that when multiple threads race
 // we can do compareAndSet with the expected/new to ensure only one happens
 // 拿到合并请求
 RequestBatch BatchReturnType, ResponseType, RequestArgumentType currentBatch = batch.get;
 // 1) it can be null if it got shutdown
 // 2) we don't execute this batch if it has no requests and let it wait until next tick to be executed
 // 处理合并请求
 if  0) {
 // do execution within context of wrapped Callable
 createNewBatchAndExecutePreviousIfNeeded;
 } catch  {
 logger.error;
 t.printStackTrace;
 // ignore error so we don't kill the Timer mainLoop and prevent further items from being scheduled
 return null;
复制代码