當前位置:
首頁 > 知識 > 「Hadoop」hadoop 文件上傳和下載分析

「Hadoop」hadoop 文件上傳和下載分析

Hadoop文件上傳和下載分析 首先說明下,hadoop的各種搭建方式不再介紹,相信各位玩hadoop的同學隨便都能搭出來。 樓主的環境: 操作系統:Ubuntu 15.10 hadoop版本:2.7.3 HA:否(隨便搭了個偽分散式) 文件上傳 下圖描述了Client向HDFS上傳一個200M大小的日誌文件的大致過程: 首先,Client發起文件上傳請求,即通過RPC與NameNode建立通訊。 NameNode與各DataNode使用心跳機制來獲取DataNode信息。NameNode收到Client請求後,獲取DataNode信息,並將可存儲文件的節點信息返回給Client。 Client收到NameNode返回的信息,與對應的DataNode節點取得聯繫,並向該節點寫文件。 文件寫入到DataNode後,以流水線的方式複製到其他DataNode(當然,這裡面也有DataNode向NameNode申請block,這裡不詳細介紹),至於複製多少份,與所配置的hdfs-default.xml中的dfs.replication相關。 元數據存儲 先明確幾個概念: fsimage:元數據鏡像文件。存儲某一時段NameNode內存元數據信息。 edits:操作日誌文件。 fstime:保存最近一次checkpoint的時間 checkpoint可在hdfs-default.xml中具體配置,默認為3600秒: 1 2 dfs.namenode.checkpoint.period 3 3600 4 The number of seconds between two periodic checkpoints. 5 6 fsimage和edits文件在namenode目錄可以看到: NameNode中的元數據信息: test.log文件上傳後,Namenode始終在內存中保存metedata,用於處理「讀請求」。metedata主要存儲了文件名稱(FileName),副本數量(replicas),分多少block存儲(block-ids),分別存儲在哪個節點上(id2host)等。 到有「寫請求」到來時,namenode會首先寫editlog到磁碟,即向edits文件中寫日誌,成功返回後,才會修改內存,並且向客戶端返回 hadoop會維護一個fsimage文件,也就是namenode中metedata的鏡像,但是fsimage不會隨時與namenode內存中的metedata保持一致,而是每隔一段時間通過合併edits文件來更新內容。此時Secondary namenode就派上用場了,合併fsimage和edits文件並更新NameNode的metedata。 Secondary namenode工作流程: secondary通知namenode切換edits文件 secondary通過http請求從namenode獲得fsimage和edits文件 secondary將fsimage載入內存,然後開始合併edits secondary將新的fsimage發回給namenode namenode用新的fsimage替換舊的fsimage 通過一張圖可以表示為: 文件下載 文件下載相對來說就簡單一些了,如圖所示,Client要從DataNode上,讀取test.log文件。而test.log由block1和block2組成。 文件下載的主要流程為: client向namenode發送請求。 namenode查看Metadata信息,返回test.log的block的位置。 Block1: h0,h1,h3 Block2: h0,h2,h4 開始從h0節點下載block1,block2。 源碼分析 我們先簡單使用hadoop提供的API來實現文件的上傳下載(文件刪除、改名等操作比較簡單,這裡不演示): 1 package cn.jon.hadoop.hdfs; 2 3 import java.io.FileInputStream; 4 import java.io.FileOutputStream; 5 import java.io.IOException; 6 import java.io.InputStream; 7 import java.io.OutputStream; 8 import java.net.URI; 9 import java.net.URISyntaxException; 10 11 import org.apache.hadoop.conf.Configuration; 12 import org.apache.hadoop.fs.FileSystem; 13 import org.apache.hadoop.fs.Path; 14 import org.apache.hadoop.io.IOUtils; 15 import org.junit.Before; 16 import org.junit.Test; 17 18 public class HDFSDemo { 19 FileSystem fs = null; 20 @Before 21 public void init(){ 22 try { 23 //初始化文件系統 24 fs = FileSystem.get(new URI("hdfs://hadoopmaster:9000"), new Configuration(), "root"); 25 } catch (IOException e) { 26 e.printStackTrace(); 27 } catch (InterruptedException e) { 28 e.printStackTrace(); 29 } catch (URISyntaxException e) { 30 e.printStackTrace(); 31 } 32 } 33 public static void main(String[] args) { 34 35 } 36 @Test 37 /** 38 * 文件上傳 39 */ 40 public void testFileUpload(){ 41 try { 42 OutputStream os = fs.create(new Path("/test.log")); 43 FileInputStream fis = new FileInputStream("I://test.log"); 44 IOUtils.copyBytes(fis, os, 2048,true); 45 //可以使用hadoop提供的簡單方式 46 fs.copyFromLocalFile(new Path("I://test.log"), new Path("/test.log")); 47 } catch (IllegalArgumentException | IOException e) { 48 e.printStackTrace(); 49 } 50 } 51 @Test 52 /** 53 * 文件下載 54 */ 55 public void testFileDownload(){ 56 try { 57 InputStream is = fs.open(new Path("/test.log")); 58 FileOutputStream fos = new FileOutputStream("E://test.log"); 59 IOUtils.copyBytes(is, fos, 2048); 60 //可以使用hadoop提供的簡單方式 61 fs.copyToLocalFile(new Path("/test.log"), new Path("E://test.log")); 62 } catch (IllegalArgumentException | IOException e) { 63 e.printStackTrace(); 64 } 65 } 66 67 } 顯而易見,只要是對hdfs上的文件進行操作,必須對FileSystem進行初始化,我們先來分析FileSystem的初始化: 1 public static FileSystem get(URI uri, Configuration conf) throws IOException { 2 return CACHE.get(uri, conf);//部分方法我只截取了部分代碼,這裡進入get()方法 3 } 1 FileSystem get(URI uri, Configuration conf) throws IOException{ 2 Key key = new Key(uri, conf); 3 return getInternal(uri, conf, key);//調用getInternal() 4 } 1 private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{ 2 //使用單例模式創建FileSystem,這是由於FS的初始化需要大量的時間,使用單例保證只是第一次載入慢一些,返回FileSystem的子類實現DistributedFileSystem 3 FileSystem fs; 4 synchronized (this) { 5 fs = map.get(key); 6 } 7 if (fs != null) { 8 return fs; 9 } 10 11 fs = createFileSystem(uri, conf); 12 synchronized (this) { // refetch the lock again 13 FileSystem oldfs = map.get(key); 14 if (oldfs != null) { // a file system is created while lock is releasing 15 fs.close(); // close the new file system 16 return oldfs; // return the old file system 17 } 18 19 // now insert the new file system into the map 20 if (map.isEmpty() 21 && !ShutdownHookManager.get().isShutdownInProgress()) { 22 ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY); 23 } 24 fs.key = key; 25 map.put(key, fs); 26 if (conf.getBoolean("fs.automatic.close", true)) { 27 toAutoClose.add(key); 28 } 29 return fs; 30 } 31 } 1 public void initialize(URI uri, Configuration conf) throws IOException { 2 super.initialize(uri, conf); 3 setConf(conf); 4 5 String host = uri.getHost(); 6 if (host == null) { 7 throw new IOException("Incomplete HDFS URI, no host: "+ uri); 8 } 9 homeDirPrefix = conf.get( 10 DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_KEY, 11 DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT); 12 13 this.dfs = new DFSClient(uri, conf, statistics);//實例化DFSClient,並將它作為DistributedFileSystem的引用,下面我們跟進去 14 this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority()); 15 this.workingDir = getHomeDirectory(); 16 } 1 public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, 2 Configuration conf, FileSystem.Statistics stats) 3 throws IOException { 4 //該構造太長,樓主只截取了重要部分給大家展示,有感興趣的同學可以親手進源碼瞧瞧 5 NameNodeProxies.ProxyAndInfo proxyInfo = null; 6 //這裡聲明了NameNode的代理對象,跟我們前面討論的rpc就息息相關了 7 if (proxyInfo != null) { 8 this.dtService = proxyInfo.getDelegationTokenService(); 9 this.namenode = proxyInfo.getProxy(); 10 } else if (rpcNamenode != null) { 11 Preconditions.checkArgument(nameNodeUri == null); 12 this.namenode = rpcNamenode; 13 dtService = null; 14 } else { 15 Preconditions.checkArgument(nameNodeUri != null, 16 "null URI"); 17 proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri, 18 ClientProtocol.class, nnFallbackToSimpleAuth); 19 this.dtService = proxyInfo.getDelegationTokenService(); 20 this.namenode = proxyInfo.getProxy();//獲取NameNode代理對象引用並自己持有,this.namenode類型為ClientProtocol,它是一個介面,我們看下這個介面 21 } 22 } 1 public interface ClientProtocol{ 2 public static final long versionID = 69L; 3 //還有很多對NameNode操作的方法申明,包括對文件上傳,下載,刪除等 4 //樓主特意把versionID貼出來了,這就跟我們寫的RPCDemo中的MyBizable介面完全類似,所以說Client一旦拿到該介面實現類的代理對象(NameNodeRpcServer),Client就可以實現與NameNode的RPC通信,我們繼續跟進 5 } 1 public static ProxyAndInfo createProxy(Configuration conf, 2 URI nameNodeUri, Class xface, AtomicBoolean fallbackToSimpleAuth) 3 throws IOException { 4 AbstractNNFailoverProxyProvider failoverProxyProvider = 5 createFailoverProxyProvider(conf, nameNodeUri, xface, true, 6 fallbackToSimpleAuth); 7 if (failoverProxyProvider == null) { 8 // 如果不是HA的創建方式,樓主環境是偽分散式,所以走這裡,我們跟進去 9 return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface, 10 UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth); 11 } else { 12 // 如果有HA的創建方式 13 Conf config = new Conf(conf); 14 T proxy = (T) RetryProxy.create(xface, failoverProxyProvider, 15 RetryPolicies.failoverOnNetworkException( 16 RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts, 17 config.maxRetryAttempts, config.failoverSleepBaseMillis, 18 config.failoverSleepMaxMillis)); 19 return new ProxyAndInfo(proxy, dtService, 20 NameNode.getAddress(nameNodeUri)); 21 } 22 } 最終返回的為ClientProtocol介面的子類代理對象,而NameNodeRpcServer類實現了ClientProtocol介面,因此返回的為NameNode的代理對象,當客戶端拿到了NameNode的代理對象後,即與NameNode建立了RPC通信: 1 private static ClientProtocol createNNProxyWithClientProtocol( 2 InetSocketAddress address, Configuration conf, UserGroupInformation ugi, 3 boolean withRetries, AtomicBoolean fallbackToSimpleAuth) 4 throws IOException { 5 RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);//是不是感覺越來越像我們前面說到的RPC 6 7 final RetryPolicy defaultPolicy = 8 RetryUtils.getDefaultRetryPolicy(//載入默認策虐 9 conf, 10 DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, 11 DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT, 12 DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY, 13 DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT, 14 SafeModeException.class); 15 16 final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class); 17 //看到versionId了嗎?這下明白了rpc的使用中目標介面必須要有這個欄位了吧 18 ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy( 19 ClientNamenodeProtocolPB.class, version, address, ugi, conf, 20 NetUtils.getDefaultSocketFactory(conf), 21 org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy, 22 fallbackToSimpleAuth).getProxy(); 23 //看到沒?這裡使用 RPC.getProtocolProxy()來創建ClientNamenodeProtocolPB對象,調試時可以清楚的看見,該對象引用的是一個代理對象,值為$Proxy12,由JDK的動態代理來實現。 24 //前面我們寫RPCDemo程序時,用的是RPC.getProxy(),但是各位大家可以去看RPC源碼,RPC.getProtocolProxy()最終還是調用的getProxy() 25 if (withRetries) { 26 Map methodNameToPolicyMap 27 = new HashMap(); 28 ClientProtocol translatorProxy = 29 new ClientNamenodeProtocolTranslatorPB(proxy); 30 return (ClientProtocol) RetryProxy.create(//這裡再次使用代理模式對代理對象進行包裝,也可以理解為裝飾者模式 31 ClientProtocol.class, 32 new DefaultFailoverProxyProvider( 33 ClientProtocol.class, translatorProxy), 34 methodNameToPolicyMap, 35 defaultPolicy); 36 } else { 37 return new ClientNamenodeProtocolTranslatorPB(proxy); 38 } 39 } 整個FileSystem的初始化用時序圖表示為: 到此,FileSystem的初始化就基本完成。由於文章篇幅過大的問題,所以樓主把HDFS原理及源碼分析拆分成了兩部分,上半部分主要是HDFS原理與FileSystem的初始化介紹,那在下半部分將會具體介紹HDFS文件上傳、下載的源碼解析。 另外,文章用到的一些示例代碼,將會在下半部分發布後,樓主一起上傳到GitHub。

「Hadoop」hadoop 文件上傳和下載分析

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 程序員小新人學習 的精彩文章:

Windows NT MySQL 5.7安裝詳細(圖文)
iOS連wifi(修改密碼後的wifi)遇到的坑,純轉載

TAG:程序員小新人學習 |