當前位置:
首頁 > 知識 > Mina 報文監聽器NioDatagramAcceptor一

Mina 報文監聽器NioDatagramAcceptor一

Mina Io監聽器介面定義及抽象實現:http://donald-draper.iteye.com/blog/2378315

上一篇文章我們通過一個實例,簡單看報文通信,通過下面一句:

IoAcceptor acceptor = new NioDatagramAcceptor;

創建一個報文監聽器,今天我們來看一下報文監聽器NioDatagramAcceptor。

**
* {@link IoAcceptor} for datagram transport (UDP/IP).
*
* @author [url=http://mina.apache.org]Apache MINA Project[/url]
* @org.apache.xbean.XBean
*/
public final class NioDatagramAcceptor extends AbstractIoAcceptor implements DatagramAcceptor, IoProcessor {
從報文監聽器繼承樹來看,報文監聽器直接實現了Io處理器的功能,在往下看之前,先來看一下報文監聽器介面DatagramAcceptor
的定義;
//DatagramAcceptor
/**
* {@link IoAcceptor} for datagram transport (UDP/IP).
*
* @author [url=http://mina.apache.org]Apache MINA Project[/url]
*/
public interface DatagramAcceptor extends IoAcceptor {
/**
* @return the local InetSocketAddress which is bound currently. If more than one
* address are bound, only one of them will be returned, but it"s not
* necessarily the firstly bound address.
* This method overrides the {@link IoAcceptor#getLocalAddress} method.
返回本地當前綁定的報文地址。如果多於一個地址被綁定,其中一個將會被返回,不一定是第一個
綁定的地址
*/
@Override
InetSocketAddress getLocalAddress;

/**
* @return a {@link Set} of the local InetSocketAddress which are bound currently.
* This method overrides the {@link IoAcceptor#getDefaultLocalAddress} method.
獲取默認綁定的本地socket地址
*/
@Override
InetSocketAddress getDefaultLocalAddress;

/**
* Sets the default local InetSocketAddress to bind when no argument is specified in
* {@link #bind} method. Please note that the default will not be used
* if any local InetSocketAddress is specified.
* This method overrides the {@link IoAcceptor#setDefaultLocalAddress(java.net.SocketAddress)} method.
* 設置默認本地socket地址,如果本地地址初始化,則默認的socket地址不會被使用
* @param localAddress The local address
*/
void setDefaultLocalAddress(InetSocketAddress localAddress);

/**
* @return the {@link IoSessionRecycler} for this service.
service會話管理器
*/
IoSessionRecycler getSessionRecycler;

/**
* Sets the {@link IoSessionRecycler} for this service.
*
* @param sessionRecycler null to use the default recycler
*/
void setSessionRecycler(IoSessionRecycler sessionRecycler);

/**
* @return the default Datagram configuration of the new {@link IoSession}s
* created by this service.
獲取報文會話配置
*/
@Override
DatagramSessionConfig getSessionConfig;
}

回到報文監聽器NioDatagramAcceptor

/**
* {@link IoAcceptor} for datagram transport (UDP/IP).
*
* @author [url=http://mina.apache.org]Apache MINA Project[/url]
* @org.apache.xbean.XBean
*/
public final class NioDatagramAcceptor extends AbstractIoAcceptor implements DatagramAcceptor, IoProcessor {

/**
* A session recycler that is used to retrieve an existing session, unless it"s too old.
默認過期會話管理器
**/
private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringSessionRecycler;
/**
* A timeout used for the select, as we need to get out to deal with idle
* sessions 選擇超時時間
*/
private static final long SELECT_TIMEOUT = 1000L;
/** A lock used to protect the selector to be waked up before it"s created */
private final Semaphore lock = new Semaphore(1);
/** A queue used to store the list of pending Binds 地址綁定請求*/
private final Queue registerQueue = new ConcurrentLinkedQueue<>;
//地址解綁請求隊列
private final Queue cancelQueue = new ConcurrentLinkedQueue<>;
//刷新會話隊列,IO處理器刷新操作會用到,暫存刷新操作的會話
private final Queue flushingSessions = new ConcurrentLinkedQueue<>;
// socket地址與報文通道映射Map,綁定操作使socket地址與報文通道關聯起來
private final Map boundHandles = Collections
.synchronizedMap(new HashMap);
//會話管理器sessionRecycler,監控連接Service的會話,如果會話過期,關閉過期的會話
private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture;
private volatile boolean selectable;
/** The thread responsible of accepting incoming requests */
private Acceptor acceptor;//監聽器線程
private long lastIdleCheckTime;//上次空閑檢查時間
/** The Selector used by this acceptor 選擇器*/
private volatile Selector selector;
}

從上面來看報文監聽器NioDatagramAcceptor,內部有一個註冊隊列registerQueue,用於存放地址綁定的請求,一個取消隊列,用於存放地址解綁請求,一個Map-boundHandles,用於存放socket地址與報文通道映射映射關係,會話管理器sessionRecycler,監控連接Service的會話,如果會話過期,關閉過期的會話,一個通道選擇器selector處理報文通道的讀寫操作事件,一個監聽器線程acceptor,用於處理地址綁定和解綁,報文通道讀寫事件,發送會話消息及銷毀監聽器工作。

再來看構造:

/**
* Creates a new instance.
*/
public NioDatagramAcceptor {
this(new DefaultDatagramSessionConfig, null);
}
/**
* Creates a new instance.
* 與上面不同的是,多一個IO事件執行器參數
* @param executor The executor to use
*/
public NioDatagramAcceptor(Executor executor) {
this(new DefaultDatagramSessionConfig, executor);
}
/**
* Creates a new instance.
與上面不同的是,多個會話配置參數
*/
private NioDatagramAcceptor(IoSessionConfig sessionConfig, Executor executor) {
super(sessionConfig, executor);

try {
init;//初始化報文監聽器
selectable = true;
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeIoException("Failed to initialize.", e);
} finally {
if (!selectable) {
try {
destroy;
} catch (Exception e) {
ExceptionMonitor.getInstance.exceptionCaught(e);
}
}
}
}

來看初始化報文監聽器

init;//初始化報文監聽器
protected void init throws Exception {
//打開一個選擇器
this.selector = Selector.open;
}

從上面可以看出,報文監聽器構造主要是初始化會話配置,IO事件執行器和打開選擇器。

由於報文監聽器即實現了Io監聽器,有實現了Io處理器我們來看IO處理器的相關實現:

/**
* {@inheritDoc}
添加會話
*/
@Override
public void add(NioSession session) {
// Nothing to do for UDP
//由於報文通信是無連接的,添加會話操作實際為空
}
再來看發送會話寫請求:
/**
* {@inheritDoc}
*/
@Override
public void write(NioSession session, WriteRequest writeRequest) {
// We will try to write the message directly
long currentTime = System.currentTimeMillis;//獲取系統當前時間
//獲取會話寫請求隊列
final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue;
//計算會話最大發送位元組數
final int maxWrittenBytes = session.getConfig.getMaxReadBufferSize
+ (session.getConfig.getMaxReadBufferSize >>> 1);

int writtenBytes = 0;

// Deal with the special case of a Message marker (no bytes in the request)
// We just have to return after having calle dthe messageSent event
//獲取會話寫請求buffer
IoBuffer buf = (IoBuffer) writeRequest.getMessage;

if (buf.remaining == 0) {
// Clear and fire event
//如果buffer中沒有數據,則置空會話當前寫請求,觸發會話發送事件
session.setCurrentWriteRequest(null);
buf.reset;
session.getFilterChain.fireMessageSent(writeRequest);
return;
}

// Now, write the data
try {
for (;;) {
if (writeRequest == null) {
//如果寫請求為空,則從請求隊列poll一個寫請求
writeRequest = writeRequestQueue.poll(session);
if (writeRequest == null) {
//取消關注寫事件
setInterestedInWrite(session, false);
break;
}
//設置會話當前寫請求
session.setCurrentWriteRequest(writeRequest);
}
//獲取寫請求buffer
buf = (IoBuf fer) writeRequest.getMessage;

if (buf.remaining == 0) {
// Clear and fire event
//如果buffer中沒有數據,則置空會話當前寫請求,觸發會話發送事件
session.setCurrentWriteRequest(null);
buf.reset;
session.getFilterChain.fireMessageSent(writeRequest);
continue;
}
//獲取寫請求目的socket地址
SocketAddress destination = writeRequest.getDestination;

if (destination == null) {
//寫請求目的地址為null,則獲取會話遠端socket地址
destination = session.getRemoteAddress;
}
//發送buffer數據到socket地址
int localWrittenBytes = send(session, buf, destination);

if ((localWrittenBytes == 0) || (writtenBytes >= maxWrittenBytes)) {
// Kernel buffer is full or wrote too much
//如果buffer數據太多或沒有寫成功,添加寫請求到會話請求隊列,關注寫事件
setInterestedInWrite(session, true);
session.getWriteRequestQueue.offer(session, writeRequest);
scheduleFlush(session);
} else {
//則取消關注寫事件,置空會話當前寫請求,觸發會話發送事件
setInterestedInWrite(session, false);
// Clear and fire event
session.setCurrentWriteRequest(null);
writtenBytes += localWrittenBytes;
buf.reset;
session.getFilterChain.fireMessageSent(writeRequest);
break;
}
}
} catch (Exception e) {
session.getFilterChain.fireExceptionCaught(e);
} finally {
//更新會話寫位元組計數器
session.increaseWrittenBytes(writtenBytes, currentTime);
}
}

發送會話請求數據有一下幾點要關註:

1.

//設置會話寫事件

setInterestedInWrite(session, false);
protected void setInterestedInWrite(NioSession session, boolean isInterested) throws Exception {
//獲取會話選擇key
SelectionKey key = session.getSelectionKey;
if (key == null) {
return;
}
int newInterestOps = key.interestOps;

if (isInterested) {
//設置關注寫事件
newInterestOps |= SelectionKey.OP_WRITE;
} else {
//取消關注寫事件
newInterestOps &= ~SelectionKey.OP_WRITE;
}
key.interestOps(newInterestOps);
}

2.

//發送buffer數據到socket地址
int localWrittenBytes = send(session, buf, destination);

//委託會話關聯的報文通道

protected int send(NioSession session, IoBuffer buffer, SocketAddress remoteAddress) throws Exception {
return ((DatagramChannel) session.getChannel).send(buffer.buf, remoteAddress);
}

3.

//調度刷新會話
scheduleFlush(session);

private boolean scheduleFlush(NioSession session) {
// Set the schedule for flush flag if the session
// has not already be added to the flushingSessions
// queue
//更新會話調度標誌為正在調度,添加會話到刷新隊列
if (session.setScheduledForFlush(true)) {
flushingSessions.add(session);
return true;
} else {
return false;
}
}

從上面來看,報文監聽器寫操作,首先獲取會話寫請求隊列,計算會話最大發送位元組數,獲取會話寫請求buffer;如果寫請求為空,則從請求隊列poll一個寫請求,然後獲取寫請求buffer及寫請求目的socket地址,委託會話關聯的報文通道發送數據;如果buffer數據太多或沒有寫成功,添加寫請求到會話請求隊列,關注寫事件,重新調度刷新,否則取消關注寫事件,置空會話當前寫請求,觸發會話發送事件。

再來看刷新操作:

/**
* {@inheritDoc}
*/
@Override
public void flush(NioSession session) {
//添加會話到刷新隊列
if (scheduleFlush(session)) {
//喚醒選擇器
wakeup;
}
}

//喚醒選擇器
protected void wakeup {
selector.wakeup;
}

再來看其他操作

/**
* {@inheritDoc}
*/
@Override
public void updateTrafficControl(NioSession session) {
//不支持會話傳輸控制
throw new UnsupportedOperationException;
}
/**
* {@inheritDoc}
移除會話
*/
@Override
public void remove(NioSession session) {
//從會話回收器移除會話,通知service監聽器,會話移除,觸發fireSessionDestroyed事件
getSessionRecycler.remove(session);
getListeners.fireSessionDestroyed(session);
}

看完報文監聽器IO處理器的相關功能來看一下地址綁定

/**
* {@inheritDoc}
*/
@Override
protected final Set bindInternal(List localAddresses) throws Exception {
// Create a bind request as a Future operation. When the selector
// have handled the registration, it will signal this future.
AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
// adds the Registration request to the queue for the Workers
// to handle
//添加地址綁定請求到註冊隊列
registerQueue.add(request);

// creates the Acceptor instance and has the local
// executor kick it off.
//啟動監聽器線程
startupAcceptor;

// As we just started the acceptor, we have to unblock the select
// in order to process the bind request we just have added to the
// registerQueue.
try {
lock.acquire;

// Wait a bit to give a chance to the Acceptor thread to do the select
Thread.sleep(10);
//喚醒選擇操作
wakeup;
} finally {
lock.release;
}

// Now, we wait until this request is completed.
//等待地址綁定完成
request.awaitUninterruptibly;

if (request.getException != null) {
throw request.getException;
}

// Update the local addresses.
// setLocalAddresses shouldn"t be called from the worker thread
// because of deadlock.
//handle綁定的地址集
Set newLocalAddresses = new HashSet<>;
for (DatagramChannel handle : boundHandles.values) {
newLocalAddresses.add(localAddress(handle));
}
return newLocalAddresses;
}

從上面來看綁定地址,首先添加地址綁定請求到註冊隊列registerQueue,啟動監聽器線程acceptor,喚醒選擇操作,然後等待地址綁定完成,最後返回報文通道綁定的socket地址集。

上面有幾點要關注:

1.

//啟動監聽器線程
startupAcceptor;

2.

//獲取報文通道綁定的socket地址
localAddress(handle)

先來看第二點:

protected SocketAddress localAddress(DatagramChannel handle) throws Exception {
//獲取報文通道關聯socket綁定的本地socket地址
InetSocketAddress inetSocketAddress = (InetSocketAddress) handle.socket.getLocalSocketAddress;
InetAddress inetAddress = inetSocketAddress.getAddress;

if ((inetAddress instanceof Inet6Address) && (((Inet6Address) inetAddress).isIPv4CompatibleAddress)) {
// Ugly hack to workaround a problem on linux : the ANY address is always converted to IPV6
// even if the original address was an IPV4 address. We do store the two IPV4 and IPV6
// ANY address in the map.
byte ipV6Address = ((Inet6Address) inetAddress).getAddress;
byte ipV4Address = new byte[4];

System.arraycopy(ipV6Address, 12, ipV4Address, 0, 4);

InetAddress inet4Adress = Inet4Address.getByAddress(ipV4Address);
return new InetSocketAddress(inet4Adress, inetSocketAddress.getPort);
} else {
return inetSocketAddress;
}
}

再來看第一點:

/**
* Starts the inner Acceptor thread.
*/
private void startupAcceptor throws InterruptedException {
if (!selectable) {
//如果選擇器初始化失敗,則清空註冊隊列,取消隊列及刷新會話隊列
registerQueue.clear;
cancelQueue.clear;
flushingSessions.clear;
}
lock.acquire;
if (acceptor == null) {
//創建Acceptor線程實例,並執行
acceptor = new Acceptor;
executeWorker(acceptor);
} else {
lock.release;
}
}

下面來看一下Acceptor的定義:

/**
* This private class is used to accept incoming connection from
* clients. It"s an infinite loop, which can be stopped when all
* the registered handles have been removed (unbound).
接收客戶端的連接。主操作是一個無限循環,當所有綁定的地址的報文通道解綁時,
循環退出
*/
private class Acceptor implements Runnable {
@Override
public void run {
int nHandles = 0;
lastIdleCheckTime = System.currentTimeMillis;
// Release the lock
lock.release;
while (selectable) {
try {
//超時選擇
int selected = select(SELECT_TIMEOUT);
//處理地址綁定請求
nHandles += registerHandles;
if (nHandles == 0) {
try {
lock.acquire;
if (registerQueue.isEmpty && cancelQueue.isEmpty) {
acceptor = null;
break;
}
} finally {
lock.release;
}
}
if (selected > 0) {
//處理讀寫操作時間就緒的會話
processReadySessions(selectedHandles);
}
long currentTime = System.currentTimeMillis;
//發送刷新隊列中的寫請求
flushSessions(currentTime);
//處理報文通道地址解綁請求
nHandles -= unregisterHandles;
//通知會話空閑
notifyIdleSessions(currentTime);
} catch (ClosedSelectorException cse) {
// If the selector has been closed, we can exit the loop
ExceptionMonitor.getInstance.exceptionCaught(cse);
break;
} catch (Exception e) {
ExceptionMonitor.getInstance.exceptionCaught(e);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
}
}
}
//如何Io處理器正在關閉,則銷毀報文監聽器
if (selectable && isDisposing) {
selectable = false;
try {
destroy;
} catch (Exception e) {
ExceptionMonitor.getInstance.exceptionCaught(e);
} finally {
disposalFuture.setValue(true);
}
}
}
}

由於篇幅問題,監聽器線程acceptor,我們放到下一篇再講

總結:

報文監聽器NioDatagramAcceptor,內部有一個註冊隊列registerQueue,用於存放地址綁定的請求,一個取消隊列,用於存放地址解綁請求,一個Map-boundHandles,用於存放socket地址與報文通道映射映射關係,會話管理器sessionRecycler,監控連接Service的會話,如果會話過期,關閉過期的會話,一個通道選擇器selector處理報文通道的讀寫操作事件,一個監聽器線程acceptor,用於處理地址綁定和解綁,報文通道讀寫事件,發送會話消息及銷毀監聽器工作。報文監聽器構造主要是初始化會話配置,IO事件執行器和打開選擇器。報文監聽器寫操作,首先獲取會話寫請求隊列,計算會話最大發送位元組數,獲取會話寫請求buffer;如果寫請求為空,則從請求隊列poll一個寫請求,然後獲取寫請求buffer及寫請求目的socket地址,委託會話關聯的報文通道發送數據;如果buffer數據太多或沒有寫成功,添加寫請求到會話請求隊列,關注寫事件,否則取消關注寫事件,置空會話當前寫請求,觸發會話發送事件。綁定地址,首先添加地址綁定請求到註冊隊列registerQueue,啟動監聽器線程acceptor,喚醒選擇操作,然後等待地址綁定完成,最後返回報文通道綁定的socket地址集。

Mina 報文監聽器NioDatagramAcceptor二(發送會話消息據等):http://donald-draper.iteye.com/blog/2379228

附:

會話回收器IoSessionRecycler:

/**
* A connectionless transport can recycle existing sessions by assigning an
* {@link IoSessionRecycler} to an {@link IoService}.
*
* @author [url=http://mina.apache.org]Apache MINA Project[/url]
*/
public interface IoSessionRecycler {
/**
* A dummy recycler that doesn"t recycle any sessions. Using this recycler will
* make all session lifecycle events to be fired for every I/O for all connectionless
* sessions.
*/
IoSessionRecycler NOOP = new IoSessionRecycler {
/**
* {@inheritDoc}
*/
@Override
public void put(IoSession session) {
// Do nothing
}
/**
* {@inheritDoc}
*/
@Override
public IoSession recycle(SocketAddress remoteAddress) {
return null;
}
/**
* {@inheritDoc}
*/
@Override
public void remove(IoSession session) {
// Do nothing
}
};
/**
* Called when the underlying transport creates or writes a new {@link IoSession}.
*
* @param session the new {@link IoSession}.
*/
void put(IoSession session);
/**
* Attempts to retrieve a recycled {@link IoSession}.
*
* @param remoteAddress the remote socket address of the {@link IoSession} the transport wants to recycle.
* @return a recycled {@link IoSession}, or null if one cannot be found.
*/
IoSession recycle(SocketAddress remoteAddress);
/**
* Called when an {@link IoSession} is explicitly closed.
*
* @param session the new {@link IoSession}.
*/
void remove(IoSession session);
}

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 科技優家 的精彩文章:

Grunt壓縮HTML和CSS
select源碼分析及小結
設置虛擬機網路連接方式

TAG:科技優家 |

您可能感興趣

人工智慧助聽器!ReSound Linx Quattro搭載Siri了
Servlet中幾個監聽器Listener的使用實例
iOS 12「隱藏功能」,AirPods 支持 Live Listen 變身助聽器
iOS12悄悄更新Live Listen!令AirPods 變成你的助聽器
Filter過濾器和Listener監聽器的的用法和區別
iOS 12再曝隱藏功能!Airpods或變監聽器
SpringBoot | 第七章:過濾器、監聽器、攔截器
不能來現場?小編把試聽器材錄音給大家聽——LALS classical 12 SE試聽會後記
iOS 12增添新功能:AirPods搖身一變為助聽器
iOS 12新功能可以讓AirPods變身助聽器
下一代 AirPods,或許可以參考一下「助聽器」
AirPods有新用途?小心被不法人士當竊聽器
Science子刊:新AI腦波助聽器讓聽障人士無障礙交流
下一代 AirPods,或許可以從助聽器里找到靈感
全球最大助聽器零售商Amplifon計劃18年底進入中國市場
翻譯、健康監測、AI 演算法加持,Livio AI 要做一款酷炫的助聽器
IOS 12 新功能AIRPODS 變助聽器?
可穿戴設備的未來在哪?蘋果AirPods可以變身助聽器?
AirPods 變身助聽器,可穿戴設備的應用值得更多期待
蘋果曝大漏洞!iPhone變成「竊聽器」,快關掉這個功能!