當前位置:
首頁 > 知識 > 「Netty」UDP廣播事件

「Netty」UDP廣播事件

一、前言

前面學習了WebSocket協議,並且通過示例講解了WebSocket的具體使用,接著學習如何使用無連接的UDP來廣播事件。

二、UDP廣播事件

2.1 UDP基礎

面向連接的TCP協議管理端到端的連接,在連接生命周期中,發送的消息會有序並且可靠地進行傳輸,最後連接有序地終止。然而,在無連接協議(如UDP)中,沒有持久連接的概念,每個消息(UDP數據報)都是獨立的傳輸,此外,UDP沒有TCP的糾錯機制(即每個對等體會確認其接收到的分組,並且發送者會重傳未確認的分組)。

UDP的限制比TCP多,但是比TCP快很多,這是因為消除了握手和消息管理的所有開銷,UDP非常適合處理或容忍消息丟失的應用。

2.2 UDP廣播

迄今為止所有的示例都使用了單播的傳輸模式,其被定義為將消息發送到由唯一地址標識的單個網路目的地,有連接和無連接的協議都支持這種模式,UDP為多個收件人發送消息提供了額外的傳輸模式:

· 組播--傳輸到定義的主機組。

· 廣播--傳輸到網路(或子網)上的所有主機。

本章中的示例將通過發送在同一網路上的所有主機接收的消息來使用UDP廣播。

2.3 UDP簡單示例

示例將打開一個文件,並通過UDP將每一行廣播為指定埠。下圖展示了應用的結構圖。

「Netty」UDP廣播事件

2.4 LogEvent POJO

在消息應用中,消息經常以POJO形式展現,LogEvent的POJO如下。

public final class LogEvent {
public static final byte SEPARATOR = (byte) ":";
private final InetSocketAddress source;
private final String logfile;
private final String msg;
private final long received;
public LogEvent(String logfile, String msg) {
this(null, -1, logfile, msg);
}
public LogEvent(InetSocketAddress source, long received,
String logfile, String msg) {
this.source = source;
this.logfile = logfile;
this.msg = msg;
this.received = received;
}
public InetSocketAddress getSource {
return source;
}
public String getLogfile {
return logfile;
}
public String getMsg {
return msg;
}
public long getReceivedTimestamp {
return received;
}
}

2.5 編寫broadcaster

Netty提供了許多類來支持UDP應用程序,如Netty的DatagramPacket是DatagramChannel實現與遠程對等體進行通信的簡單消息容器,我們需要一個編碼器將EventLog消息轉換為DatagramPackets,可以擴展Netty的MessageToMessageEncoder,LogEventEncoder的代碼如下。

public class LogEventEncoder extends MessageToMessageEncoder {
private final InetSocketAddress remoteAddress;
public LogEventEncoder(InetSocketAddress remoteAddress) {
this.remoteAddress = remoteAddress;
}
@Override
protected void encode(ChannelHandlerContext channelHandlerContext,
LogEvent logEvent, List out) throws Exception {
byte file = logEvent.getLogfile.getBytes(CharsetUtil.UTF_8);
byte msg = logEvent.getMsg.getBytes(CharsetUtil.UTF_8);
ByteBuf buf = channelHandlerContext.alloc
.buffer(file.length + msg.length + 1);
buf.writeBytes(file);
buf.writeByte(LogEvent.SEPARATOR);
buf.writeBytes(msg);
out.add(new DatagramPacket(buf, remoteAddress));
}
}

完成編碼器後,即可以開始啟動服務端,其中服務端LogEventBroadcaster的代碼如下。

public class LogEventBroadcaster {
private final Bootstrap bootstrap;
private final File file;
private final EventLoopGroup group;

public LogEventBroadcaster(InetSocketAddress address, File file) {
group = new NioEventLoopGroup;
bootstrap = new Bootstrap;
bootstrap.group(group)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(new LogEventEncoder(address));

this.file = file;
}

public void run throws IOException {
Channel ch = bootstrap.bind(0).syncUninterruptibly.channel;
System.out.println("LogEventBroadcaster running");
long pointer = 0;
for (;;) {
long len = file.length;
if (len < pointer) { // file was reset pointer = len; } else if (len > pointer) {
// Content was added
RandomAccessFile raf = new RandomAccessFile(file, "r");
raf.seek(pointer);
String line;
while ((line = raf.readLine) != null) {
ch.writeAndFlush(new LogEvent(null, -1, file.getAbsolutePath, line));
}
pointer = raf.getFilePointer;
raf.close;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.interrupted;
break;
}
}
}

public void stop {
group.shutdownGracefully;
}

public static void main(String[] args) throws Exception {
if (args.length != 2) {
throw new IllegalArgumentException;
}

LogEventBroadcaster broadcaster = new LogEventBroadcaster(new InetSocketAddress("255.255.255.255",
Integer.parseInt(args[0])), new File(args[1]));
try {
broadcaster.run;
} finally {
broadcaster.stop;
}
}
}

2.6 編寫monitor

在應用中

· 接收由LogEventBroadcaster廣播的UDP DatagramPackets。

· 將其解碼為LogEvent。

· 將LogEvent寫入輸出流System.out。

下圖展示LogEvent的流動。

LogEventDecoder負責將傳入的DatagramPackets解碼為LogEvent消息,其代碼如下。

public class LogEventDecoder extends MessageToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, DatagramPacket datagramPacket, List out) throws Exception {
ByteBuf data = datagramPacket.content;
int i = data.indexOf(0, data.readableBytes, LogEvent.SEPARATOR);
String filename = data.slice(0, i).toString(CharsetUtil.UTF_8);
String logMsg = data.slice(i + 1, data.readableBytes).toString(CharsetUtil.UTF_8);

LogEvent event = new LogEvent(datagramPacket.recipient, System.currentTimeMillis,
filename,logMsg);
out.add(event);
}
}

而LogEventHandler用於處理LogEvent,其代碼如下。

public class LogEventHandler extends SimpleChannelInboundHandler {

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace;
ctx.close;
}

@Override
public void channelRead0(ChannelHandlerContext channelHandlerContext, LogEvent event) throws Exception {
StringBuilder builder = new StringBuilder;
builder.append(event.getReceivedTimestamp);
builder.append(" [");
builder.append(event.getSource.toString);
builder.append("] [");
builder.append(event.getLogfile);
builder.append("] : ");
builder.append(event.getMsg);

System.out.println(builder.toString);
}
}

LogEventMonitor用於將處理器添加至管道中,其代碼如下。

public class LogEventMonitor {

private final Bootstrap bootstrap;
private final EventLoopGroup group;
public LogEventMonitor(InetSocketAddress address) {
group = new NioEventLoopGroup;
bootstrap = new Bootstrap;
bootstrap.group(group)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(new ChannelInitializer {
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline;
pipeline.addLast(new LogEventDecoder);
pipeline.addLast(new LogEventHandler);
}
}).localAddress(address);

}

public Channel bind {
return bootstrap.bind.syncUninterruptibly.channel;
}

public void stop {
group.shutdownGracefully;
}

public static void main(String[] args) throws Exception {
if (args.length != 1) {
throw new IllegalArgumentException("Usage: LogEventMonitor ");
}
LogEventMonitor monitor = new LogEventMonitor(new InetSocketAddress(Integer.parseInt(args[0])));
try {
Channel channel = monitor.bind;
System.out.println("LogEventMonitor running");

channel.closeFuture.await;
} finally {
monitor.stop;
}
}
}

運行LogEventBroadcaster和LogEventMonitor

三、總結

本篇博文講解了UDP協議,以及其示例,在實際應用中需要根據不同的應用場景選擇不同的協議,謝謝各位園友的觀看~

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 達人科技 的精彩文章:

react 的五臟六腑ing~
6.類似Object監視器方法的Condition介面
RedHat7上安裝MySQL5.7.16

TAG:達人科技 |

您可能感興趣

Pの廣播◢Narrow
NABShow shanghai——新一代廣播級攝影機URSA Broadcast
Fate/Prototype 廣播劇CD第3卷詳情公開
挪威廣播電台Radio Norge更換新LOGO
聲優RAP企劃「HypnosisMic」廣播劇發布
Blackmagic重磅發布4K廣播級攝像機URSA Broadcast
Pの廣播◢特別回顧 Cross Space
哥倫比亞廣播公司CAO及CHOAnthony Ambrosio將離職
Pの廣播◢特別放送 Sound Tripper 第三回
《DARLING in the FRANKXX》BD第2捲髮售 附贈原畫集、小冊子、廣播劇CD
《Fate/Prototype 蒼銀的碎片》廣播劇延期發售
China Media Group ready for World Cup 2018 世界盃!中央廣播電視總台亮相紅場
Magic Leap One將於今夏面世;英國廣播公司宣布推出兩款VR遊戲
Pの廣播◢特別放送 Sound Tripper 第36回
Blackmagic發布4K URSA廣播機 價格再次逼死同行
音樂流媒體服務商Pandora 35億美元賣身衛星廣播巨頭SiriusXM
女性向手游《Starry Palette》2月7日重啟網路廣播:便於推廣遊戲消息
使用 RadioDroid 流傳輸網路廣播
Oculus Go更新:新增Casting廣播,萬聖節特輯和舉報功能
英國廣播公司宣布了首個Proms VR體驗