當前位置:
首頁 > 知識 > Java NIO原理分析

Java NIO原理分析

這裡主要圍繞著Java NIO展開,從Java NIO的基本使用,到介紹Linux下NIO API,再到Java Selector其底層的實現原理。

  • Java NIO基本使用
  • Linux下的NIO系統調用介紹
  • Selector原理
  • Channel和Buffer之間的堆外內存

Java NIO基本使用

從JDK NIO文檔裡面可以發現,Java將其劃分成了三大塊:ChannelBuffer以及多路復用Selector。Channel的存在,封裝了對什麼實體的連接通道(如網路/文件);Buffer封裝了對數據的緩衝存儲,最後對於Selector則是提供了一種可以以單線程非阻塞的方式,來處理多個連接。

基本應用示例

NIO的基本步驟是,創建Selector和ServerSocketChannel,然後註冊channel的ACCEPT事件,調用select方法,等待連接的到來,以及接收連接後將其註冊到Selector中。下面的為Echo Server的示例:

public class SelectorDemo {

public static void main(String[] args) throws IOException {

Selector selector = Selector.open;
ServerSocketChannel socketChannel = ServerSocketChannel.open;
socketChannel.bind(new InetSocketAddress(8080));
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
int ready = selector.select;
if (ready == 0) {
continue;
} else if (ready < 0) { break; } Set keys = selector.selectedKeys;
Iterator iterator = keys.iterator;
while (iterator.hasNext) {

SelectionKey key = iterator.next;
if (key.isAcceptable) {

ServerSocketChannel channel = (ServerSocketChannel) key.channel;
SocketChannel accept = channel.accept;
if (accept == null) {
continue;
}
accept.configureBlocking(false);
accept.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable) {
// 讀事件
deal((SocketChannel) key.channel, key);
} else if (key.isWritable) {
// 寫事件
resp((SocketChannel) key.channel, key);
}
// 註:處理完成後要從中移除掉
iterator.remove;
}
}
selector.close;
socketChannel.close;
}

private static void deal(SocketChannel channel, SelectionKey key) throws IOException {

ByteBuffer buffer = ByteBuffer.allocate(1024);
ByteBuffer responseBuffer = ByteBuffer.allocate(1024);

int read = channel.read(buffer);

if (read > 0) {
buffer.flip;
responseBuffer.put(buffer);
} else if (read == -1) {
System.out.println("socket close");
channel.close;
return;
}

key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
key.attach(responseBuffer);
}

private static void resp(SocketChannel channel, SelectionKey key) throws IOException {

ByteBuffer buffer = (ByteBuffer) key.attachment;
buffer.flip;

channel.write(buffer);
if (!buffer.hasRemaining) {
key.attach(null);
key.interestOps(SelectionKey.OP_READ);
}
}
}

Linux下的NIO系統調用介紹

在Linux環境下,提供了幾種方式可以實現NIO,如epoll,poll,select等。對於select/poll,每次調用,都是從外部傳入FD和監聽事件,這就導致每次調用的時候,都需要將這些數據從用戶態複製到內核態,就導致了每次調用代價比較大,而且每次從select/poll返回回來,都是全量的數據,需要自行去遍歷檢查哪些是READY的。對於epoll,則為增量式的,系統內部維護了所需要的FD和監聽事件,要註冊的時候,調用epoll_ctl即可,而每次調用,不再需要傳入了,返回的時候,只返回READY的監聽事件和FD。下面作個簡單的偽代碼:

// 1. 創建server socket
// 2. 綁定地址
// 3. 監聽埠
// 4. 創建epoll
int epollFd = epoll_create(1024);
// 5. 註冊監聽事件
struct epoll_event event;
event.events = EPOLLIN | EPOLLRDHUP | EPOLLET;
event.data.fd = serverFd;
epoll_ctl(epollFd, EPOLL_CTL_ADD, serverFd, &event);

while(true) {
readyNums = epoll_wait( epollFd, events, 1024, -1 );

if ( readyNums < 0 ) { printf("epoll_wait error "); exit(-1); } for ( i = 0; i < readyNums; ++i) { if ( events[i].data.fd == serverFd ) { clientFd = accept( serverFd, NULL, NULL ); // 註冊監聽事件 ... }else if ( events[i].events & EPOLLIN ) { // 處理讀事件 }else if ( events[i].events & EPOLLRDHUP ) { // 關閉連接事件 close( events[i].data.fd ); } }

Selector原理SelectionKey

從Java頂層使用者角度來看,channel通過註冊,返回SelectionKey,而Selector.select方法,也是通過返回SelectionKey來使用。那麼這裡為什麼會需要這個類呢?這個類有什麼作用?無論是任何語言,其實都脫離不了系統底層的支持,通過上述Linux下的基本應用,可以知道,通過系統調用,向其傳遞和返回的都是FD以及事件這些參數,那麼站在設計角度來看,就需要有一個映射關係,使得可以關聯起來,這裡有Channel封裝的是通過,如果將READY事件這些參數放在裡面,不太合適,這個時候,SelectionKey出現了,在SelectionKey內部,保存Channel的引用以及一些事件信息,然後Selector通過FD找到SelectionKey來進行關聯。在底層EP裡面,就有一個屬性:Map fdToKey

EPollSelectorImpl

在Linux 2.6+版本,Java NIO採用的epoll(即EPollSelectorImpl類),對於2.4.x的,則使用poll(即PollSelectorImpl類),這裡以epoll為例。

select方法

頂層Selector,通過調用select方法,最終會調用到EPollSelectorImpl.doSelect方法,通過該方法,可以看到,其首先會處理一些不再註冊的事件,調用pollWrapper.poll(timeout);,然後再進行一次清理,最後,可以看到需要處理映射關係

protected int doSelect(long timeout)
throws IOException
{
if (closed)
throw new ClosedSelectorException;
// 處理一些不再註冊的事件
processDeregisterQueue;
try {
begin;
pollWrapper.poll(timeout);
} finally {
end;
}
// 再進行一次清理
processDeregisterQueue;
int numKeysUpdated = updateSelectedKeys;
if (pollWrapper.interrupted) {
// Clear the wakeup pipe
pollWrapper.putEventOps(pollWrapper.interruptedIndex, 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted;
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}

private int updateSelectedKeys {
int entries = pollWrapper.updated;
int numKeysUpdated = 0;
for (int i=0; i

EPollArrayWrapper

EpollArrayWrapper封裝了底層的調用,裡面包含幾個native方法,如:

private native int epollCreate;
private native void epollCtl(int epfd, int opcode, int fd, int events);
private native int epollWait(long pollAddress, int numfds, long timeout,
int epfd) throws IOException;

在openjdk的native目錄(native/sun/nio/ch)裡面可以找到對應的實現EPollArrayWrapper.c。

(這裡順帶提一下,要實現native方法,可以在類里的方法加上native關鍵字,然後編譯成class文件,再轉換輸出.h,c/c++底層實現該頭文件的方法,編譯成so庫,放到對應目錄即可)

在初始化文件方法裡面,可以看到,是通過動態解析載入進來的,最終調用的epoll_create等方法。

JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_init(JNIEnv *env, jclass this)
{
epoll_create_func = (epoll_create_t) dlsym(RTLD_DEFAULT, "epoll_create");
epoll_ctl_func = (epoll_ctl_t) dlsym(RTLD_DEFAULT, "epoll_ctl");
epoll_wait_func = (epoll_wait_t) dlsym(RTLD_DEFAULT, "epoll_wait");

if ((epoll_create_func == NULL) || (epoll_ctl_func == NULL) ||
(epoll_wait_func == NULL)) {
JNU_ThrowInternalError(env, "unable to get address of epoll functions, pre-2.6 kernel?");
}
}

Channel和Buffer之間的堆外內存

經常會聽見別人說,堆外內存容易泄漏,以及Netty框架裡面採用了堆外內存,減少拷貝提高性能。那麼這裡面的堆外內存指的是什麼?之前懷著一個好奇心,通過read方法,最後追蹤到SocketChannelImpl裡面read方法,裡面調用了IOUtil的read方法。裡面會首先判斷傳入的Buffer是不是DirectBuffer,如果不是(則是HeapByteBuffer),則會創建一個臨時的DirectBuffer,然後再將其複製到堆內。IOUtil.read方法:

static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4, Object var5) throws IOException {
if(var1.isReadOnly) {
throw new IllegalArgumentException("Read-only buffer");
} else if(var1 instanceof DirectBuffer) {
// 為堆外內存,則直接讀取
return readIntoNativeBuffer(var0, var1, var2, var4, var5);
} else {
// 為堆內內存,先獲取臨時堆外內存
ByteBuffer var6 = Util.getTemporaryDirectBuffer(var1.remaining);

int var8;
try {
// 讀取到堆外內存
int var7 = readIntoNativeBuffer(var0, var6, var2, var4, var5);
var6.flip;
if(var7 > 0) {
// 複製到堆內
var1.put(var6);
}

var8 = var7;
} finally {
// 釋放臨時堆外內存
Util.offerFirstTemporaryDirectBuffer(var6);
}

return var8;
}
}

這裡有一個問題就是,為什麼會需要DirectBuffer以及堆外內存?通過對DirectByteBuffer的創建來分析,可以知道,通過unsafe.allocateMemory(size);來分配內存的,而對於該方法來說,可以說是直接調用malloc返回,這一塊內存是不受GC管理的,也就是所說的:堆外內存容易泄漏。但是對於使用DirectByteBuffer來說,會創建一個Deallocator,註冊到Cleaner裡面,當對象被回收的時候,則會被直接,從而釋放掉內存,減少內存泄漏。要用堆外內存,從上面的創建來看,堆外內存創建後,以long型地址保存的,而堆內內存會受到GC影響,對象會被移動,如果採用堆內內存,進行系統調用的時候,那麼GC就需要停止,否則就會有問題,基於這一點,採用了堆外內存(這一塊參考了R大的理解:https://www.zhihu.com/question/57374068)。

註:堆外內存的創建(unsafe.cpp):

// 僅僅作了對齊以及將長度放在數組前方就返回了
UNSAFE_ENTRY(jlong, Unsafe_AllocateMemory(JNIEnv *env, jobject unsafe, jlong size))
UnsafeWrapper("Unsafe_AllocateMemory");
size_t sz = (size_t)size;
if (sz != (julong)size || size < 0) { THROW_0(vmSymbols::java_lang_IllegalArgumentException); } if (sz == 0) { return 0; } sz = round_to(sz, HeapWordSize); void* x = os::malloc(sz); if (x == NULL) { THROW_0(vmSymbols::java_lang_OutOfMemoryError); } //Copy::fill_to_words((HeapWord*)x, sz / HeapWordSize); return addr_to_java(x); UNSAFE_END

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 科技優家 的精彩文章:

PHP設計模式:簡單工廠
巧設publicPath,優雅適配生產環境要求
Linux網路編程客戶伺服器設計範式
《基於Node.js實現簡易聊天室系列之詳細設計》

TAG:科技優家 |

您可能感興趣

LruCache原理分析
Prometheus原理和源碼分析
RPC 基本原理與 Apach Thrift 初體驗
CGLIB(Code Generation Library) 介紹與原理
BGP Confederation原理實踐
華為Nov 5i Pro原理圖泄漏
SpringBoot自動裝配原理分析
基於Netty的Android系統IM簡單實現原理
深入理解 Web Server 原理與實踐:Nginx
Linux內存映射mmap原理分析
AlphaGo之父DeepMind再出神作,PrediNet原理詳解
Leap Motion發布AR頭顯Project North Star原理圖
ES6之類class的原理解析
Veritas Velocity數據副本管理技術、原理詳解
Remote Procedure Call基本原理
札記CAS、AQS、CountDownLatch、CyclicBarrier的原理
GBDT分類的原理及Python實現
集群管理工具KafkaAdminClient——原理與示例
ZooKeeper 設計原理
HashMap的實現原理(JDK8)