當前位置:
首頁 > 知識 > Hystrix是個什麼玩意兒

Hystrix是個什麼玩意兒

(給

ImportNew

加星標,提高Java技能)


轉自:

崔世寧


https://www.cnblogs.com/kingszelda/p/10312242.html

1. 什麼是Hystrix

Hystrix是Netflix的一個開源框架,地址如下:https://github.com/Netflix/Hystrix



中文名為「豪豬」,即平時很溫順,在

感受到危險

的時候,用刺

保護自己

;在

危險過去

後,還是一個溫順的肉球。



所以,整個框架的核心業務也就是這2點:




  1. 何時需要保護



  2. 如何保護

2. 何時需要保護



對於一個系統而言,它往往承擔著2層角色,服務提供者與服務消費者。對於服務消費者而言最大的痛苦就是如何「明哲保身」,做過網關項目的同學肯定感同身受


上面是一個常見的系統依賴關係,底層的依賴往往很多,通信協議包括 socket、HTTP、Dubbo、WebService等等。當通信層發生網路抖動以及所依賴的系統發生業務響應異常時,我們業務本身所提供的服務能力也直接會受到影響。



這種效果傳遞下去就很有可能造成雪崩效應,即整個業務聯調發生異常,比如業務整體超時,或者訂單數據不一致。



那麼核心問題就來了,如何檢測業務處於異常狀態?



成功率!

成功率直接反映了業務的數據流轉狀態,是最直接的業務表現。



當然,也可以根據超時時間做判斷,比如 Sentinel 的實現。其實這裡概念上可以做一個轉化,用時間做超時控制,超時=失敗,這依然是一個成功率的概念。



3. 如何保護 


如同豪豬一樣,「刺」就是他的保護工具,所有的攻擊都會被刺無情的懟回去。



在 Hystrix 的實現中,這就出現了「熔斷器」的概念,即當前的系統是否處於需要保護的狀態。



當熔斷器處於開啟的狀態時,所有的請求都不會真正的走之前的業務邏輯,而是直接返回一個約定的信息,即 FallBack。通過這種快速失敗原則保護我們的系統。 



但是,系統不應該永遠處於「有刺」的狀態,當危險過後需要恢復正常。



於是對熔斷器的核心操作就是如下幾個功能:





  1. 如果成功率過低,就打開熔斷器,阻止正常業務



  2. 隨著時間的流動,熔斷器處於半打開狀態,嘗試性放入一筆請求


  熔斷器的核心 API 如下圖: 

4. 限流、熔斷、隔離、降級



這四個概念是我們談起微服務會經常談到的概念,這裡我們討論的是 Hystrix 的實現方式。

限流



  • 這裡的限流與 Guava 的 RateLimiter 的限流差異比較大,一個是為了「保護自我」,一個是「保護下游」



  • 當對服務進行限流時,超過的流量將直接 Fallback,即熔斷。而 RateLimiter 關心的其實是「流量整形」,將不規整流量在一定速度內規整

熔斷



  • 當我的應用無法提供服務時,我要對上游請求熔斷,避免上游把我壓垮



  • 當我的下游依賴成功率過低時,我要對下游請求熔斷,避免下游把我拖垮

降級



  • 降級與熔斷緊密相關,熔斷後業務如何表現,約定一個快速失敗的 Fallback,即為服務降級

隔離



  • 業務之間不可互相影響,不同業務需要有獨立的運行空間



  • 最徹底的,可以採用物理隔離,不同的機器部



  • 次之,採用進程隔離,一個機器多個 Tomcat



  • 次之,請求隔離



  • 由於 Hystrix 框架所屬的層級為代碼層,所以實現的是請求隔離,線程池或信號量

5. 源碼分析




先上一個 Hystrix 的業務流程圖



可以看到 Hystrix 的請求都要經過 HystrixCommand 的包裝,其核心邏輯在 AbstractComman.java 類中。



下面的源碼是基於 RxJava 的,看之前最好先了解下 RxJava 的常見用法與邏輯,否則看起來會很迷惑。



簡單的說,RxJava 就是基於回調的函數式編程。通俗的說,就等同於策略模式的匿名內部類實現。



5.1 熔斷器



首先看信號量是如何影響我們請求的:


private

Observable<R>

applyHystrixSemantics

(

final

AbstractCommand<R> _cmd)

{
       

// 自定義擴展


       executionHook.onStart(_cmd);

       

//判斷熔斷器是否允許請求過來


       

if

(circuitBreaker.attemptExecution()) {
       

//獲得分組信號量,如果沒有採用信號量分組,返回默認通過的信號量實現


           

final

TryableSemaphore executionSemaphore = getExecutionSemaphore();
           

final

AtomicBoolean semaphoreHasBeenReleased =

new

AtomicBoolean(

false

);
       

//調用終止的回調函數


           

final

Action0 singleSemaphoreRelease =

new

Action0() {
               

@Override


               

public

void

call

()

{
                   

if

(semaphoreHasBeenReleased.compareAndSet(

false

,

true

)) {
                       executionSemaphore.release();
                   }
               }
           };
       

//調用異常的回調函數


           

final

Action1<Throwable> markExceptionThrown =

new

Action1<Throwable>() {
               

@Override


               

public

void

call

(Throwable t)

{
                   eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
               }
           };
       

//根據信號量嘗試競爭信號量


           

if

(executionSemaphore.tryAcquire()) {
               

try

{
                   

//競爭成功,註冊執行參數


                   executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                   

return

executeCommandAndObserve(_cmd)
                           .doOnError(markExceptionThrown)
                           .doOnTerminate(singleSemaphoreRelease)
                           .doOnUnsubscribe(singleSemaphoreRelease);
               }

catch

(RuntimeException e) {
                   

return

Observable.error(e);
               }
           }

else

{
          

//競爭失敗,進入fallback


               

return

handleSemaphoreRejectionViaFallback();
           }
       }

else

{
       

//熔斷器已打開,進入fallback


           

return

handleShortCircuitViaFallback();
       }
   }

什麼時候熔斷器可以放請求進來:


@Override


       

public

boolean

attemptExecution

()

{
       

//動態屬性判斷,熔斷器是否強制開著,如果強制開著,就不允許請求


           

if

(properties.circuitBreakerForceOpen().get()) {
               

return

false

;
           }
       

//如果強制關閉,就允許請求


           

if

(properties.circuitBreakerForceClosed().get()) {
               

return

true

;
           }
       

//如果當前是關閉,就允許請求


           

if

(circuitOpened.get() == -

1

) {
               

return

true

;
           }

else

{
          

//如果當前開著,就看是否已經過了"滑動窗口",過了就可以請求,不過就不可以


               

if

(isAfterSleepWindow()) {
                   

//only the first request after sleep window should execute


                   

//if the executing command succeeds, the status will transition to CLOSED


                   

//if the executing command fails, the status will transition to OPEN


                   

//if the executing command gets unsubscribed, the status will transition to OPEN


            

//這裡使用CAS的方式,只有一個請求能過來,即"半關閉"狀態


                   

if

(status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
                       

return

true

;
                   }

else

{
                       

return

false

;
                   }
               }

else

{
                   

return

false

;
               }
           }
       }
   }

這裡有個重要概念就是"滑動窗口":

private

boolean

isAfterSleepWindow

()

{
           

final

long

circuitOpenTime = circuitOpened.get();
           

final

long

currentTime = System.currentTimeMillis();
           

final

long

sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
       

//滑動窗口的判斷就是看看熔斷器打開的時間與現在相比是否超過了配置的滑動窗口


           

return

currentTime > circuitOpenTime + sleepWindowTime;
       }

5.2 隔離

如果將業務請求進行隔離?


private

Observable<R>

executeCommandWithSpecifiedIsolation

(

final

AbstractCommand<R> _cmd)

{
     

//判斷隔離策略是什麼,是線程池隔離還是信號量隔離    


       

if

(properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
           

// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)


       

//線程池隔離的運行邏輯如下


           

return

Observable.defer(

new

Func0<Observable<R>>() {
               

@Override


               

public

Observable<R>

call

()

{
                   executionResult = executionResult.setExecutionOccurred();
                   

if

(!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                       

return

Observable.error(

new

IllegalStateException(

"execution attempted while in state : "

+ commandState.get().name()));
                   }
            

//按照配置生成監控數據


                   metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);

                   

if

(isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
                       

// the command timed out in the wrapping thread so we will return immediately


                       

// and not increment any of the counters below or other such logic


                       

return

Observable.error(

new

RuntimeException(

"timed out before executing run()"

));
                   }
                   

if

(threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
                       

//we have not been unsubscribed, so should proceed


                       HystrixCounters.incrementGlobalConcurrentThreads();
                       threadPool.markThreadExecution();
                       

// store the command that is being run


                       endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                       executionResult = executionResult.setExecutedInThread();
                       

/**
                        * If any of these hooks throw an exception, then it appears as if the actual execution threw an error
                        */


                       

try

{
                 

//執行擴展點邏輯


                           executionHook.onThreadStart(_cmd);
                           executionHook.onRunStart(_cmd);
                           executionHook.onExecutionStart(_cmd);
                           

return

getUserExecutionObservable(_cmd);
                       }

catch

(Throwable ex) {
                           

return

Observable.error(ex);
                       }
                   }

else

{
                       

//command has already been unsubscribed, so return immediately


                       

return

Observable.empty();
                   }
               }
        

//註冊各種場景的回調函數


           }).doOnTerminate(

new

Action0() {
               

@Override


               

public

void

call

()

{
                   

if

(threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
                       handleThreadEnd(_cmd);
                   }
                   

if

(threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
                       

//if it was never started and received terminal, then no need to clean up (I don"t think this is possible)


                   }
                   

//if it was unsubscribed, then other cleanup handled it


               }
           }).doOnUnsubscribe(

new

Action0() {
               

@Override


               

public

void

call

()

{
                   

if

(threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
                       handleThreadEnd(_cmd);
                   }
                   

if

(threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
                       

//if it was never started and was cancelled, then no need to clean up


                   }
                   

//if it was terminal, then other cleanup handled it


               }
        

//將邏輯放在線程池的調度器上執行,即將上述邏輯放入線程池中


           }).subscribeOn(threadPool.getScheduler(

new

Func0<Boolean>() {
               

@Override


               

public

Boolean

call

()

{
                   

return

properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
               }
           }));
       }

else

{
        

//走到這裡就是信號量隔離,在當前線程中執行,沒有調度器


           

return

Observable.defer(

new

Func0<Observable<R>>() {
               

@Override


               

public

Observable<R>

call

()

{
                   executionResult = executionResult.setExecutionOccurred();
                   

if

(!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                       

return

Observable.error(

new

IllegalStateException(

"execution attempted while in state : "

+ commandState.get().name()));
                   }

                   metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
                   

// semaphore isolated


                   

// store the command that is being run


                   endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                   

try

{
                       executionHook.onRunStart(_cmd);
                       executionHook.onExecutionStart(_cmd);
                       

return

getUserExecutionObservable(_cmd);  

//the getUserExecutionObservable method already wraps sync exceptions, so this shouldn"t throw


                   }

catch

(Throwable ex) {
                       

//If the above hooks throw, then use that as the result of the run method


                       

return

Observable.error(ex);
                   }
               }
           });
       }
   }

5.3 核心運行流程


private

Observable<R>

executeCommandAndObserve

(

final

AbstractCommand<R> _cmd)

{
       

final

HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
     

//執行發生的回調


       

final

Action1<R> markEmits =

new

Action1<R>() {
           

@Override


           

public

void

call

(R r)

{
               

if

(shouldOutputOnNextEvents()) {
                   executionResult = executionResult.addEvent(HystrixEventType.EMIT);
                   eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
               }
               

if

(commandIsScalar()) {
                   

long

latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                   eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
                   executionResult = executionResult.addEvent((

int

) latency, HystrixEventType.SUCCESS);
                   eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (

int

) latency, executionResult.getOrderedList());
                   circuitBreaker.markSuccess();
               }
           }
       };
    

//執行成功的回調,標記下狀態,熔斷器根據這個狀態維護熔斷邏輯


       

final

Action0 markOnCompleted =

new

Action0() {
           

@Override


           

public

void

call

()

{
               

if

(!commandIsScalar()) {
                   

long

latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                   eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
                   executionResult = executionResult.addEvent((

int

) latency, HystrixEventType.SUCCESS);
                   eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (

int

) latency, executionResult.getOrderedList());
                   circuitBreaker.markSuccess();
               }
           }
       };
    

//執行失敗的回調


       

final

Func1<Throwable, Observable<R>> handleFallback =

new

Func1<Throwable, Observable<R>>() {
           

@Override


           

public

Observable<R>

call

(Throwable t)

{
               circuitBreaker.markNonSuccess();
               Exception e = getExceptionFromThrowable(t);
               executionResult = executionResult.setExecutionException(e);
          

//各種回調進行各種fallback


               

if

(e

instanceof

RejectedExecutionException) {
                   

return

handleThreadPoolRejectionViaFallback(e);
               }

else

if

(t

instanceof

HystrixTimeoutException) {
                   

return

handleTimeoutViaFallback();
               }

else

if

(t

instanceof

HystrixBadRequestException) {
                   

return

handleBadRequestByEmittingError(e);
               }

else

{
                   

/*
                    * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
                    */


                   

if

(e

instanceof

HystrixBadRequestException) {
                       eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                       

return

Observable.error(e);
                   }

                   

return

handleFailureViaFallback(e);
               }
           }
       };

       

final

Action1<Notification<?

super

R>> setRequestContext =

new

Action1<Notification<?

super

R>>() {
           

@Override


           

public

void

call

(Notification<?

super

R> rNotification)

{
               setRequestContextIfNeeded(currentRequestContext);
           }
       };

       Observable<R> execution;
       

if

(properties.executionTimeoutEnabled().get()) {
           execution = executeCommandWithSpecifiedIsolation(_cmd)
                   .lift(

new

HystrixObservableTimeoutOperator<R>(_cmd));
       }

else

{
           execution = executeCommandWithSpecifiedIsolation(_cmd);
       }
    

//註冊各種回調函數


       

return

execution.doOnNext(markEmits)
               .doOnCompleted(markOnCompleted)
               .onErrorResumeNext(handleFallback)
               .doOnEach(setRequestContext);
   }

6. 小結



  • Hystrix 是基於單機應用的熔斷限流框架



  • 根據熔斷器的滑動窗口判斷當前請求是否可以執行



  • 線程競爭實現「半關閉」狀態,拿一個請求試試是否可以關閉熔斷器



  • 線程池隔離將請求丟到線程池中運行,限流依靠線程池拒絕策略



  • 信號量隔離在當前線程中運行,限流依靠並發請求數



  • 當信號量競爭失敗/線程池隊列滿,就進入限流模式,執行 Fallback



  • 當熔斷器開啟,就熔斷請求,執行 Fallback 



  • 整個框架採用的 RxJava 的編程模式,回調函數滿天飛 


看完本文有收穫?請轉發分享給更多人


關注「ImportNew」,提升Java技能



喜歡就點「好看」唄~



喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 ImportNew 的精彩文章:

讓異常處理代碼更健壯

TAG:ImportNew |