當前位置:
首頁 > 知識 > disruptor 源碼解讀

disruptor 源碼解讀

(點擊

上方公眾號

,可快速關注)




來源:笨狐狸 ,


blog.csdn.net/liweisnake/article/details/78842176




disruptor經過幾年的發展,似乎已經成為性能優化的大殺器,幾乎每個想優化性能的項目宣稱自己用上了disruptor,性能都會呈現質的躍進。畢竟,最好的例子就是LMAX自己的架構設計,支撐了600w/s的吞吐。




本文試圖從代碼層面將關鍵問題做些解答。



基本概念




Disruptor: 實際上就是整個基於ringBuffer實現的生產者消費者模式的容器。




RingBuffer: 著名的環形隊列,可以類比為BlockingQueue之類的隊列,ringBuffer的使用,使得內存被循環使用,減少了某些場景的內存分配回收擴容等耗時操作。




 



EventProcessor: 事件處理器,實際上可以理解為消費者模型的框架,實現了線程Runnable的run方法,將循環判斷等操作封在了裡面。







EventHandler: 事件處置器,與前面處理器的不同是,事件處置器不負責框架內的行為,僅僅是EventProcessor作為消費者框架對外預留的擴展點罷了。




Sequencer: 作為RingBuffer生產者的父介面,其直接實現有SingleProducerSequencer和MultiProducerSequencer。






EventTranslator: 事件轉換器。實際上就是新事件向舊事件覆蓋的介面定義。




SequenceBarrier: 消費者路障。規定了消費者如何向下走。都說disruptor無鎖,事實上,該路障算是變向的鎖。




WaitStrategy: 當生產者生產得太快而消費者消費得太慢時的等待策略。







把上面幾個關鍵概念畫個圖,大概長這樣:






所以接下來主要也就從生產者,消費者以及ringBuffer3個維度去看disruptor是如何玩的。




生產者




生產者發布消息的過程從disruptor的publish方法為入口,實際調用了ringBuffer的publish方法。publish方法主要做了幾件事,一是先確保能拿到後面的n個sequence;二是使用translator來填充新數據到相應的位置;三是真正的聲明這些位置已經發布完成。




public void publishEvent(EventTranslator<E> translator)  


{  


  final long sequence = sequencer.next();  


  translateAndPublish(translator, sequence);  


}  


 public void publishEvents(EventTranslator<E>[] translators, int batchStartsAt, int batchSize)  


 {  


     checkBounds(translators, batchStartsAt, batchSize);  


     final long finalSequence = sequencer.next(batchSize);  


     translateAndPublishBatch(translators, batchStartsAt, batchSize, finalSequence);  


 }




獲取生產者下一個sequence的方法,細節已經注釋,實際上最終目的就是確保生產者和消費者互相不越界。





public long next(int n)  


{  


    if (n < 1)  


    {  


        throw new IllegalArgumentException("n must be > 0");  


    }  


    //該生產者發布的最大序列號  


    long nextValue = this.nextValue;  


    //該生產者欲發布的序列號  


    long nextSequence = nextValue + n;  


    //覆蓋點,即該生產者如果發布了這次的序列號,那它最終會落在哪個位置,實際上是nextSequence做了算術處理以後的值,最終目的是統一計算,否則就要去判絕對值以及取模等麻煩操作  


    long wrapPoint = nextSequence - bufferSize;  


    //所有消費者中消費得最慢那個的前一個序列號  


    long cachedGatingSequence = this.cachedValue;  


   


    //這裡兩個判斷條件:一是看生產者生產是不是超過了消費者,所以判斷的是覆蓋點是否超過了最慢消費者;二是看消費者是否超過了當前生產者的最大序號,判斷的是消費者是不是比生產者還快這種異常情況  


    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)  


    {  


        cursor.setVolatile(nextValue);  // StoreLoad fence  


   


        long minSequence;  


        //覆蓋點是不是已經超過了最慢消費者和當前生產者序列號的最小者(這兩個有點難理解,實際上就是覆蓋點不能超過最慢那個生產者,也不能超過當前自身,比如一次發布超過bufferSize),gatingSequences的處理也是類似算術處理,也可以看成是相對於原點是正還是負  


        while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))  


        {  


            //喚醒阻塞的消費者  


            waitStrategy.signalAllWhenBlocking();  


            //等上1納秒  


            LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?  


        }  


        //把這個最慢消費者緩存下來,以便下一次使用  


        this.cachedValue = minSequence;  


    }  


    //把當前序列號更新為欲發布序列號  


    this.nextValue = nextSequence;  


   


    return nextSequence;  


}




translator由用戶在調用時自己實現,其實就是預留的一個擴展點,將覆蓋事件預留出來。大部分實現都是將ByteBuffer複製到Event中,參考disruptor github官方例子。




最後聲明新序列號發布完成,實際上就是設置了cursor,並且通知可能阻塞的消費者,這裡已經發布完新的Event了,快來消費吧。





public void publish(long sequence)  


{  


    cursor.set(sequence);  


    waitStrategy.signalAllWhenBlocking();  


}




以上就是單生產者的分析,MultiProducerSequencer可以類似分析。




等待策略





等待策略實際上就是用來同步生產者和消費者的方法。SequenceBarrier只有一個實現ProcessingSequenceBarrier,中間就用到了WaitStrategy




BlockingWaitStrategy就是真正的加鎖阻塞策略,採用的就是ReentrantLock以及Condition來控制阻塞與喚醒。




TimeoutBlockingWaitStrategy是BlockingWaitStrategy中條件帶超時的版本。




LiteBlockingWaitStrategy是BlockingWaitStrategy的改進版,走了ReentrantLock和CAS輕量級鎖結合的方式,不過注釋說這算是實驗性質的微性能改進。




BusySpinWaitStrategy算是一個自旋鎖,其實現很有趣,即不停的調用Thread類的onSpinWait方法。




YieldingWaitStrategy是自旋鎖的一種改進,自旋鎖對於cpu來說太重,於是YieldingWaitStrategy先自旋100次,如果期間沒有達成退出等待的條件,則主動讓出cpu給其他線程作為懲罰。




SleepingWaitStrategy又是YieldingWaitStrategy的一種改進,SleepingWaitStrategy頭100次先自旋,如果期間沒有達成退出條件,則接下來100次主動讓出cpu作為懲罰,如果還沒有達成條件,則不再計數,每次睡1納秒。




PhasedBackoffWaitStrategy相對複雜點,基本上是10000次自旋以後要麼出讓cpu,然後繼續自旋,要麼就採取新的等待策略。




消費者




EventProcessor是整個消費者事件處理框架,其主體就是線程的run方法,來看BatchEventProcessor,總體比較簡單。





public void run()  


{  


    if (!running.compareAndSet(false, true))  


    {  


        throw new IllegalStateException("Thread is already running");  


    }  


    sequenceBarrier.clearAlert();  


   


    notifyStart();  


   


    T event = null;  


    long nextSequence = sequence.get() + 1L;  


    try 


    {  


        while (true)  


        {  


            try 


            {  


                //等待至少一個可用的sequence出來  


               final long availableSequence = sequenceBarrier.waitFor(nextSequence);  


                if (batchStartAware != null)  


                {  


                    batchStartAware.onBatchStart(availableSequence - nextSequence + 1);  


                }  


                //一個一個消費事件  


                while (nextSequence <= availableSequence)  


                {  


                    //從ringBuffer里獲取下一個事件  


                    event = dataProvider.get(nextSequence);  


                    //消費這個事件  


                    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);  


                    nextSequence++;  


                }  


                //當前的sequence推進到availableSequence  


                sequence.set(availableSequence);  


            }  


            catch (final TimeoutException e)  


            {  


                notifyTimeout(sequence.get());  


            }  


            catch (final AlertException ex)  


            {  


                if (!running.get())  


                {  


                    break;  


                }  


            }  


            catch (final Throwable ex)  


            {  


                exceptionHandler.handleEventException(ex, nextSequence, event);  


                sequence.set(nextSequence);  


                nextSequence++;  


            }  


        }  


    }  


    finally 


    {  


        notifyShutdown();  


        running.set(false);  


    }  


}




RingBuffer




RingBuffer這邊代碼比較簡單,主要就是封裝了一下發布的api





abstract class RingBufferFields<E> extends RingBufferPad  


{  


    private static final int BUFFER_PAD;  


    private static final long REF_ARRAY_BASE;  


    private static final int REF_ELEMENT_SHIFT;  


    private static final Unsafe UNSAFE = Util.getUnsafe();  


   


    static 


    {  


        final int scale = UNSAFE.arrayIndexScale(Object[].class);  


        if (4 == scale)  


        {  


            REF_ELEMENT_SHIFT = 2;  


        }  


        else if (8 == scale)  


        {  


            REF_ELEMENT_SHIFT = 3;  


        }  


        else 


        {  


            throw new IllegalStateException("Unknown pointer size");  


        }  


        // 如果scale是4, BUFFER_PAD則為32  


       BUFFER_PAD = 128 / scale;  


        // Including the buffer pad in the array base offset BUFFER_PAD<<REF_ELEMENT_SHIFT 實際上就是BUFFER_PAD * scale,最終算出來REF_ARRAY_BASE就是基地址  


        REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << REF_ELEMENT_SHIFT);  


    }  


   


    private final long indexMask;  


    private final Object[] entries;  


    protected final int bufferSize;  


    protected final Sequencer sequencer;  


   


    RingBufferFields(  


        EventFactory<E> eventFactory,  


        Sequencer sequencer)  


    {  


        this.sequencer = sequencer;  


        this.bufferSize = sequencer.getBufferSize();  


   


        if (bufferSize < 1)  


        {  


            throw new IllegalArgumentException("bufferSize must not be less than 1");  


        }  


        if (Integer.bitCount(bufferSize) != 1)  


        {  


            throw new IllegalArgumentException("bufferSize must be a power of 2");  


        }  


   


        this.indexMask = bufferSize - 1;  


        //bufferSize再加兩倍的BUFFER_PAD大小,BUFFER_PAD分別在頭尾  


       this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];  


        fill(eventFactory);  


    }  


   


    private void fill(EventFactory<E> eventFactory)  


    {  


        for (int i = 0; i < bufferSize; i++)  


        {  


            //初始化整個buffer  


            entries[BUFFER_PAD + i] = eventFactory.newInstance();  


        }  


    }  


   


    @SuppressWarnings("unchecked")  


    protected final E elementAt(long sequence)  


    {  


        //sequence & indexMask即對sequence取模, 最終算出來的就是基地址+偏移地址  


        return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));  


    }  


}




主體代碼基本如上。其他代碼可以自行參考。




下面介紹下一些常見問題。




1. disruptor應該如何用才能發揮最大功效?




disruptor原本就是事件驅動的設計,其整個架構跟普通的多線程很不一樣。比如一種用法,將disruptor作為業務處理,中間帶I/O處理,這種玩法比多線程還慢;相反,如果將disruptor做業務處理,需要I/O時採用nio非同步調用,不阻塞disruptor消費者線程,等到I/O非同步調用回來後在回調方法中將後續處理重新塞到disruptor隊列中,可以看出來,這是典型的事件處理架構,確實能在時間上佔據優勢,加上ringBuffer固有的幾項性能優化,能讓disruptor發揮最大功效。




2. disruptor為啥這麼快?




這個問題參考之前的一篇文章

disruptor框架為什麼這麼強大





http://blog.csdn.net/liweisnake/article/details/9113119




3. 多生產者如何寫入消息?




多生產者的消息寫入實際上是通過availableBuffer與消費者來同步最後一個生產者寫入的位置,這樣,消費者永遠不能超越最慢的那個生產者。見如下代碼段





private void setAvailableBufferValue(int index, int flag)  


{  


    long bufferAddress = (index * SCALE) + BASE;  


    UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);  


}  


   


@Override 


public boolean isAvailable(long sequence)  


{  


    int index = calculateIndex(sequence);  


    int flag = calculateAvailabilityFlag(sequence);  


    long bufferAddress = (index * SCALE) + BASE;  


    return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag;  


}  


   


@Override 


public long getHighestPublishedSequence(long lowerBound, long availableSequence)  


{  


    for (long sequence = lowerBound; sequence <= availableSequence; sequence++)  


    {  


        if (!isAvailable(sequence))  


        {  


            return sequence - 1;  


        }  


    }  


   


    return availableSequence;  


}




可以參考這篇文章

RingBuffer多生產者寫入





http://alicharles.com/article/disruptor-ringbuffer-muti-write/




4. 除了多個消費者重複處理生產者發送的消息,是否可以多消費者不重複處理生產者發送的消息,即各處理各的?




若要多消費者重複處理生產者的消息,則使用disruptor.handleEventsWith方法將消費者傳入;而若要消費者不重複的處理生產者的消息,則使用disruptor.handleEventsWithWorkerPool方法將消費者傳入。




看完本文有收穫?請轉發分享給更多人


關注「ImportNew」,提升Java技能


喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 ImportNew 的精彩文章:

偵探劇場:堆內存神秘溢出事件

TAG:ImportNew |