當前位置:
首頁 > 知識 > Spark 源碼分析之ShuffleMapTask內存數據Spill和合併

Spark 源碼分析之ShuffleMapTask內存數據Spill和合併

前置條件

  • Hadoop版本: Hadoop 2.6.0-cdh5.15.0
  • Spark版本: SPARK 1.6.0-cdh5.15.0
  • JDK.1.8.0_191
  • scala2.10.7

技能標籤

  • Spark ShuffleMapTask 內存中的數據Spill到臨時文件
  • 臨時文件中的數據是如何定入的,如何按partition升序排序,再按Key升序排序寫入(key,value)數據
  • 每個臨時文件,都存入對應的每個分區有多少個(key,value)對,有多少次流提交數組,數組中保留每次流的大小
  • 如何把臨時文件合成一個文件
  • 如何把內存中的數據和臨時文件,進行分區,按key,排序後,再寫入合併文件中

內存中數據Spill到磁碟

  • ShuffleMapTask進行當前分區的數據讀取(此時讀的是HDFS的當前分區,注意還有一個reduce分區,也就是ShuffleMapTask輸出文件是已經按Reduce分區處理好的)
  • SparkEnv指定默認的SortShuffleManager,getWriter()中匹配BaseShuffleHandle對象,返回SortShuffleWriter對象
  • SortShuffleWriter,用的是ExternalSorter(外部排序對象進行排序處理),會把rdd.iterator(partition, context)的數據通過iterator插入到ExternalSorter中PartitionedAppendOnlyMap對象中做為內存中的map對象數據,每插入一條(key,value)的數據後,會對當前的內存中的集合進行判斷,如果滿足溢出文件的條件,就會把內存中的數據寫入到SpillFile文件中
  • 滿中溢出文件的條件是,每插入32條數據,並且,當前集合中的數據估值大於等於5m時,進行一次判斷,會通過演算法驗證對內存的影響,確定是否可以溢出內存中的數據到文件,如果滿足就把當前內存中的所有數據寫到磁碟spillFile文件中
  • SpillFile調用org.apache.spark.util.collection.ExternalSorter.SpillableIterator.spill()方法處理
  • WritablePartitionedIterator迭代對象對內存中的數據進行迭代,DiskBlockObjectWriter對象寫入磁碟,寫入的數據格式為(key,value),不帶partition的
  • ExternalSorter.spillMemoryIteratorToDisk()這個方法將內存數據迭代對象WritablePartitionedIterator寫入到一個臨時文件,SpillFile臨時文件用DiskBlockObjectWriter對象來寫入數據
  • 臨時文件的格式temp_local_+UUID
  • 遍歷內存中的數據寫入到臨時文件,會記錄每個臨時文件中每個分區的(key,value)各有多少個,elementsPerPartition(partitionId) += 1 如果說數據很大的話,會每默認每10000條數據進行Flush()一次數據到文件中,會記錄每一次Flush的數據大小batchSizes入到ArrayBuffer中保存
  • 並且在數據寫入前,會進行排序,先按key的hash分區,先按partition的升序排序,再按key的升序排序,這樣來寫入文件中,以保證讀取臨時文件時可以分隔開每個臨時文件的每個分區的數據,對於一個臨時文件中一個分區的數據量比較大的話,會按流一批10000個(key,value)進行讀取,讀取的大小訊出在batchSizes數據中,就樣讀取的時候就非常方便了

內存數據Spill和合併

  • 把數據insertAll()到ExternalSorter中,完成後,此時如果數據大的話,會進行溢出到臨時文件的操作,數據寫到臨時文件後
  • 把當前內存中的數據和臨時文件中的數據進行合併數據文件,合併後的文件只包含(key,value),並且是按partition升序排序,然後按key升序排序,輸出文件名稱:ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID) + UUID 即:"shuffle_" + shuffleId + "" + mapId + "" + reduceId + ".data" + UUID,reduceId為默認值0
  • 還會有一份索引文件: "shuffle_" + shuffleId + "" + mapId + "" + reduceId + ".index" + "." +UUID,索引文件依次存儲每個partition的位置偏移量
  • 數據文件的寫入分兩種情況,一種是直接內存寫入,沒有溢出臨時文件到磁碟中,這種是直接在內存中操作的(數據量相對小些),另外單獨分析
  • 一種是有磁碟溢出文件的,這種情況是本文重點分析的情況
  • ExternalSorter.partitionedIterator()方法可以處理所有磁碟中的臨時文件和內存中的文件,返回一個可迭代的對象,裡邊放的元素為reduce用到的(partition,Iterator(key,value)),迭代器中的數據是按key升序排序的
  • 具體是通過ExternalSorter.mergeWithAggregation(),遍歷每一個臨時文件中當前partition的數據和內存中當前partition的數據,注意,臨時文件數據讀取時是按partition為0開始依次遍歷的

源碼分析(內存中數據Spill到磁碟)

ShuffleMapTask

  • 調用ShuffleMapTask.runTask()方法處理當前HDFS分區數據
  • 調用SparkEnv.get.shuffleManager得到SortShuffleManager
  • SortShuffleManager.getWriter()得到SortShuffleWriter
  • 調用SortShuffleWriter.write()方法
  • SparkEnv.create()

val shortShuffleMgrNames = Map(
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
"tungsten-sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
val deserializeStartTime = System.currentTimeMillis()
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
metrics = Some(context.taskMetrics)
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
writer.stop(success = true).get
} catch {
case e: Exception =>
try {
if (writer != null) {
writer.stop(success = false)
}
} catch {
case e: Exception =>
log.debug("Could not stop writer", e)
}
throw e
}
}

SortShuffleWriter

  • 調用SortShuffleWriter.write()方法
  • 根據RDDDependency中mapSideCombine是否在map端合併,這個是由運算元決定,reduceByKey中mapSideCombine為true,groupByKey中mapSideCombine為false,會new ExternalSorter()外部排序對象進行排序
  • 然後把records中的數據插入ExternalSorter對象sorter中,數據來源是HDFS當前的分區

/** Write a bunch of records to this task"s output */
override def write(records: Iterator[Product2[K, V]]): Unit = {
sorter = if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
} else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don"t
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
new ExternalSorter[K, V, V](
context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
}
sorter.insertAll(records)
// Don"t bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val tmp = Utils.tempFileWith(output)
try {
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
} finally {
if (tmp.exists() && !tmp.delete()) {
logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
}
}
}

  • ExternalSorter.insertAll()方法
  • 該方法會把迭代器records中的數據插入到外部排序對象中
  • ExternalSorter中的數據是不進行排序的,是以數組的形式存儲的,健存的為(partition,key),值為Shuffle之前的RDD鏈計算結果 在內存中會對相同的key,進行合併操作,就是map端本地合併,合併的函數就是reduceByKey(+)這個運算元中定義的函數
  • maybeSpillCollection方法會判斷是否滿足磁碟溢出到臨時文件,滿足條件,會把當前內存中的數據寫到磁碟中,寫到磁碟中的數據是按partition升序排序,再按key升序排序,就是(key,value)的臨時文件,不帶partition,但是會記錄每個分區的數量elementsPerPartition(partitionId- 記錄每一次Flush的數據大小batchSizes入到ArrayBuffer中保存
  • 內存中的數據存在PartitionedAppendOnlyMap,記住這個對象,後面排序用到了這個裡邊的排序演算法

@volatile private var map = new PartitionedAppendOnlyMap[K, C]

def insertAll(records: Iterator[Product2[K, V]]): Unit = {

// TODO: stop combining if we find that the reduction factor isn"t high

val shouldCombine = aggregator.isDefined

if (shouldCombine) {

// Combine values in-memory first using our AppendOnlyMap

val mergeValue = aggregator.get.mergeValue

val createCombiner = aggregator.get.createCombiner

var kv: Product2[K, V] = null

val update = (hadValue: Boolean, oldValue: C) => {

if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)

}

while (records.hasNext) {

addElementsRead()

kv = records.next()

map.changeValue((getPartition(kv._1), kv._1), update)

maybeSpillCollection(usingMap = true)

}

} else {

// Stick values into our buffer

while (records.hasNext) {

addElementsRead()

val kv = records.next()

buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])

maybeSpillCollection(usingMap = false)

}

}

}

  • ExternalSorter.maybeSpillCollection
  • estimatedSize當前內存中數據預估占內存大小
  • maybeSpill滿足Spill條件就把內存中的數據寫入到臨時文件中
  • 調用ExternalSorter.maybeSpill()

/**
* Spill the current in-memory collection to disk if needed.
*
* @param usingMap whether we"re using a map or buffer as our current in-memory collection
*/
private def maybeSpillCollection(usingMap: Boolean): Unit = {
var estimatedSize = 0L
if (usingMap) {
estimatedSize = map.estimateSize()
if (maybeSpill(map, estimatedSize)) {
map = new PartitionedAppendOnlyMap[K, C]
}
} else {
estimatedSize = buffer.estimateSize()
if (maybeSpill(buffer, estimatedSize)) {
buffer = new PartitionedPairBuffer[K, C]
}
}
if (estimatedSize > _peakMemoryUsedBytes) {
_peakMemoryUsedBytes = estimatedSize
}
}

  • ExternalSorter.maybeSpill()
  • 對內存中的數據遍歷時,每遍歷32個元素,進行判斷,當前內存是否大於5m,如果大於5m,再進行內存的計算,如果滿足就把內存中的數據寫到臨時文件中
  • 如果滿足條件,調用ExternalSorter.spill()方法,將內存中的數據寫入臨時文件

/**

* Spills the current in-memory collection to disk if needed. Attempts to acquire more

* memory before spilling.

*

* @param collection collection to spill to disk

* @param currentMemory estimated size of the collection in bytes

* @return true if `collection` was spilled to disk; false otherwise

*/

protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {

var shouldSpill = false

if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {

// Claim up to double our current memory from the shuffle memory pool

val amountToRequest = 2 * currentMemory - myMemoryThreshold

val granted = acquireOnHeapMemory(amountToRequest)

myMemoryThreshold += granted

// If we were granted too little memory to grow further (either tryToAcquire returned 0,

// or we already had more memory than myMemoryThreshold), spill the current collection

shouldSpill = currentMemory >= myMemoryThreshold

}

shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold

// Actually spill

if (shouldSpill) {

_spillCount += 1

logSpillage(currentMemory)

spill(collection)

_elementsRead = 0

_memoryBytesSpilled += currentMemory

releaseMemory()

}

shouldSpill

}

  • ExternalSorter.spill()
  • 調用方法collection.destructiveSortedWritablePartitionedIterator進行排序,即調用PartitionedAppendOnlyMap.destructiveSortedWritablePartitionedIterator進行排序()方法排序,最終會調用WritablePartitionedPairCollection.destructiveSortedWritablePartitionedIterator()排序,調用方法WritablePartitionedPairCollection.partitionedDestructiveSortedIterator(),沒有實現,調用子類PartitionedAppendOnlyMap.partitionedDestructiveSortedIterator()方法
  • 調用方法ExternalSorter.spillMemoryIteratorToDisk() 將磁碟中的數據寫入到spillFile臨時文件中

/**
* Spill our in-memory collection to a sorted file that we can merge later.
* We add this file into `spilledFiles` to find it later.
*
* @param collection whichever collection we"re using (map or buffer)
*/
override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
spills.append(spillFile)
}

  • PartitionedAppendOnlyMap.partitionedDestructiveSortedIterator()調用排序演算法WritablePartitionedPairCollection.partitionKeyComparator
  • 即先按分區數的升序排序,再按key的升序排序

/**
* Implementation of WritablePartitionedPairCollection that wraps a map in which the keys are tuples
* of (partition ID, K)
*/
private[spark] class PartitionedAppendOnlyMap[K, V]
extends SizeTrackingAppendOnlyMap[(Int, K), V] with WritablePartitionedPairCollection[K, V] {
def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
: Iterator[((Int, K), V)] = {
val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)
destructiveSortedIterator(comparator)
}
def insert(partition: Int, key: K, value: V): Unit = {
update((partition, key), value)
}
}
/**
* A comparator for (Int, K) pairs that orders them both by their partition ID and a key ordering.
*/
def partitionKeyComparator[K](keyComparator: Comparator[K]): Comparator[(Int, K)] = {
new Comparator[(Int, K)] {
override def compare(a: (Int, K), b: (Int, K)): Int = {
val partitionDiff = a._1 - b._1
if (partitionDiff != 0) {
partitionDiff
} else {
keyComparator.compare(a._2, b._2)
}
}
}
}
}

  • ExternalSorter.spillMemoryIteratorToDisk()
  • 創建blockId : temp_shuffle_ + UUID
  • 溢出到磁碟臨時文件: temp_shuffle_ + UUID
  • 遍歷內存數據inMemoryIterator寫入到磁碟臨時文件spillFile
  • 遍歷內存中的數據寫入到臨時文件,會記錄每個臨時文件中每個分區的(key,value)各有多少個,elementsPerPartition(partitionId) 如果說數據很大的話,會每默認每10000條數據進行Flush()一次數據到文件中,會記錄每一次Flush的數據大小batchSizes入到ArrayBuffer中保存

/**

* Spill contents of in-memory iterator to a temporary file on disk.

*/

private[this] def spillMemoryIteratorToDisk(inMemoryIterator: WritablePartitionedIterator)

: SpilledFile = {

// Because these files may be read during shuffle, their compression must be controlled by

// spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use

// createTempShuffleBlock here; see SPARK-3426 for more context.

val (blockId, file) = diskBlockManager.createTempShuffleBlock()

// These variables are reset after each flush

var objectsWritten: Long = 0

var spillMetrics: ShuffleWriteMetrics = null

var writer: DiskBlockObjectWriter = null

def openWriter(): Unit = {

assert (writer == null && spillMetrics == null)

spillMetrics = new ShuffleWriteMetrics

writer = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics)

}

openWriter()

// List of batch sizes (bytes) in the order they are written to disk

val batchSizes = new ArrayBuffer[Long]

// How many elements we have in each partition

val elementsPerPartition = new Array[Long](numPartitions)

// Flush the disk writer"s contents to disk, and update relevant variables.

// The writer is closed at the end of this process, and cannot be reused.

def flush(): Unit = {

val w = writer

writer = null

w.commitAndClose()

_diskBytesSpilled += spillMetrics.shuffleBytesWritten

batchSizes.append(spillMetrics.shuffleBytesWritten)

spillMetrics = null

objectsWritten = 0

}

var success = false

try {

while (inMemoryIterator.hasNext) {

val partitionId = inMemoryIterator.nextPartition()

require(partitionId >= 0 && partitionId < numPartitions,

s"partition Id: ${partitionId} should be in the range [0, ${numPartitions})")

inMemoryIterator.writeNext(writer)

elementsPerPartition(partitionId) += 1

objectsWritten += 1

if (objectsWritten == serializerBatchSize) {

flush()

openWriter()

}

}

if (objectsWritten > 0) {

flush()

} else if (writer != null) {

val w = writer

writer = null

w.revertPartialWritesAndClose()

}

success = true

} finally {

if (!success) {

// This code path only happens if an exception was thrown above before we set success;

// close our stuff and let the exception be thrown further

if (writer != null) {

writer.revertPartialWritesAndClose()

}

if (file.exists()) {

if (!file.delete()) {

logWarning(s"Error deleting ${file}")

}

}

}

}

SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)

}

源碼分析(內存數據Spill合併)

SortShuffleWriter.insertAll

  • 即內存中的數據,如果有溢出,寫入到臨時文件後,可能會有多個臨時文件(看數據的大小)
  • 這時要開始從所有的臨時文件中,shuffle出按給reduce輸入數據(partition,Iterator),相當於要對多個臨時文件進行合成一個文件,合成的結果按partition升序排序,再按Key升序排序
  • SortShuffleWriter.write
  • 得到合成文件shuffleBlockResolver.getDataFile : 格式如 "shuffle_" + shuffleId + "" + mapId + "" + reduceId + ".data" + "." + UUID,reduceId為默認的0
  • 調用關鍵方法ExternalSorter的sorter.writePartitionedFile,這才是真正合成文件的方法
  • 返回值partitionLengths,即為數據文件中對應索引文件按分區從0到最大分區,每個分區的數據大小的數組

/** Write a bunch of records to this task"s output */
override def write(records: Iterator[Product2[K, V]]): Unit = {
sorter = if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
} else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don"t
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
new ExternalSorter[K, V, V](
context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
}
sorter.insertAll(records)
// Don"t bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val tmp = Utils.tempFileWith(output)
try {
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
} finally {
if (tmp.exists() && !tmp.delete()) {
logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
}
}
}

  • ExternalSorter.writePartitionedFile
  • 按方法名直譯,把數據寫入已分區的文件中
  • 如果沒有spill文件,直接按ExternalSorter在內存中排序,用的是TimSort排序演算法排序,單獨合出來講,這裡不詳細講
  • 如果有spill文件,是我們重點分析的,這個時候,調用this.partitionedIterator按回按[(partition,Iterator)],按分區升序排序,按(key,value)中key升序排序的數據,並鍵中方法this.partitionedIterator()
  • 寫入合併文件中,並返回寫入合併文件中每個分區的長度,放到lengths數組中,數組索引就是partition

/**
* Write all the data added into this ExternalSorter into a file in the disk store. This is
* called by the SortShuffleWriter.
*
* @param blockId block ID to write to. The index file will be blockId.name + ".index".
* @return array of lengths, in bytes, of each partition of the file (used by map output tracker)
*/
def writePartitionedFile(
blockId: BlockId,
outputFile: File): Array[Long] = {
// Track location of each range in the output file
val lengths = new Array[Long](numPartitions)
if (spills.isEmpty) {
// Case where we only have in-memory data
val collection = if (aggregator.isDefined) map else buffer
val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
while (it.hasNext) {
val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
context.taskMetrics.shuffleWriteMetrics.get)
val partitionId = it.nextPartition()
while (it.hasNext && it.nextPartition() == partitionId) {
it.writeNext(writer)
}
writer.commitAndClose()
val segment = writer.fileSegment()
lengths(partitionId) = segment.length
}
} else {
// We must perform merge-sort; get an iterator by partition and write everything directly.
for ((id, elements) <- this.partitionedIterator) {
if (elements.hasNext) {
val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
context.taskMetrics.shuffleWriteMetrics.get)
for (elem <- elements) {
writer.write(elem._1, elem._2)
}
writer.commitAndClose()
val segment = writer.fileSegment()
lengths(id) = segment.length
}
}
}
context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)
context.internalMetricsToAccumulators(
InternalAccumulator.PEAK_EXECUTION_MEMORY).add(peakMemoryUsedBytes)
lengths
}

  • this.partitionedIterator()
  • 直接調用ExternalSorter.merge()方法
  • 臨時文件參數spills
  • 內存文件排序演算法在這裡調用collection.partitionedDestructiveSortedIterator(comparator),實際調的是PartitionedAppendOnlyMap.partitionedDestructiveSortedIterator,定義了排序演算法partitionKeyComparator,即按partition升序排序,再按key升序排序

/**
* Return an iterator over all the data written to this object, grouped by partition and
* aggregated by the requested aggregator. For each partition we then have an iterator over its
* contents, and these are expected to be accessed in order (you can"t "skip ahead" to one
* partition without reading the previous one). Guaranteed to return a key-value pair for each
* partition, in order of partition ID.
*
* For now, we just merge all the spilled files in once pass, but this can be modified to
* support hierarchical merging.
* Exposed for testing.
*/
def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
val usingMap = aggregator.isDefined
val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer
if (spills.isEmpty) {
// Special case: if we have only in-memory data, we don"t need to merge streams, and perhaps
// we don"t even need to sort by anything other than partition ID
if (!ordering.isDefined) {
// The user hasn"t requested sorted keys, so only sort by partition ID, not key
groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))
} else {
// We do need to sort by both partition ID and key
groupByPartition(destructiveIterator(
collection.partitionedDestructiveSortedIterator(Some(keyComparator))))
}
} else {
// Merge spilled and in-memory data
merge(spills, destructiveIterator(
collection.partitionedDestructiveSortedIterator(comparator)))
}
}

  • ExternalSorter.merge()方法
  • 0 until numPartitions 從0到numPartitions(不包含)分區循環調用
  • IteratorForPartition(p, inMemBuffered),每次取內存中的p分區的數據
  • readers是每個分區是讀所有的臨時文件(因為每份臨時文件,都有可能包含p分區的數據),
  • readers.map(_.readNextPartition())該方法內部用的是每次調一個分區的數據,從0開始,剛好對應的是p分區的數據
  • readNextPartition方法即調用SpillReader.readNextPartition()方法
  • 對p分區的數據進行mergeWithAggregation合併後,再寫入到合併文件中

/**
* Merge a sequence of sorted files, giving an iterator over partitions and then over elements
* inside each partition. This can be used to either write out a new file or return data to
* the user.
*
* Returns an iterator over all the data written to this object, grouped by partition. For each
* partition we then have an iterator over its contents, and these are expected to be accessed
* in order (you can"t "skip ahead" to one partition without reading the previous one).
* Guaranteed to return a key-value pair for each partition, in order of partition ID.
*/
private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)])
: Iterator[(Int, Iterator[Product2[K, C]])] = {
val readers = spills.map(new SpillReader(_))
val inMemBuffered = inMemory.buffered
(0 until numPartitions).iterator.map { p =>
val inMemIterator = new IteratorForPartition(p, inMemBuffered)
val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator)
if (aggregator.isDefined) {
// Perform partial aggregation across partitions
(p, mergeWithAggregation(
iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))
} else if (ordering.isDefined) {
// No aggregator given, but we have an ordering (e.g. used by reduce tasks in sortByKey);
// sort the elements without trying to merge them
(p, mergeSort(iterators, ordering.get))
} else {
(p, iterators.iterator.flatten)
}
}
}

  • SpillReader.readNextPartition()
  • readNextItem()是真正讀數臨時文件的方法,
  • deserializeStream每次讀取一個流大小,這個大小時在spill輸出文件時寫到batchSizes中的,某個是每個分區寫一次流,如果分區中的數據很大,就按10000條數據進行一次流,這樣每滿10000次就再讀一次流,這樣就可以把當前分區裡邊的多少提交流全部讀完
  • 一進來就執行nextBatchStream()方法,該方法是按數組batchSizes存儲著每次寫入流時的數據大小
  • val batchOffsets = spill.serializerBatchSizes.scanLeft(0L)(_ + _)這個其實取到的值,就剛好是每次流的一位置偏移量,後面的偏移量,剛好是前面所有偏移量之和
  • 當前分區的流讀完時,就為空,就相當於當前分區的數據全部讀完了
  • 當partitionId=numPartitions,finished= true說明所有分區的所有文件全部讀完了

def readNextPartition(): Iterator[Product2[K, C]] = new Iterator[Product2[K, C]] {
val myPartition = nextPartitionToRead
nextPartitionToRead += 1
override def hasNext: Boolean = {
if (nextItem == null) {
nextItem = readNextItem()
if (nextItem == null) {
return false
}
}
assert(lastPartitionId >= myPartition)
// Check that we"re still in the right partition; note that readNextItem will have returned
// null at EOF above so we would"ve returned false there
lastPartitionId == myPartition
}
override def next(): Product2[K, C] = {
if (!hasNext) {
throw new NoSuchElementException
}
val item = nextItem
nextItem = null
item
}
}
/**
* Return the next (K, C) pair from the deserialization stream and update partitionId,
* indexInPartition, indexInBatch and such to match its location.
*
* If the current batch is drained, construct a stream for the next batch and read from it.
* If no more pairs are left, return null.
*/
private def readNextItem(): (K, C) = {
if (finished || deserializeStream == null) {
return null
}
val k = deserializeStream.readKey().asInstanceOf[K]
val c = deserializeStream.readValue().asInstanceOf[C]
lastPartitionId = partitionId
// Start reading the next batch if we"re done with this one
indexInBatch += 1
if (indexInBatch == serializerBatchSize) {
indexInBatch = 0
deserializeStream = nextBatchStream()
}
// Update the partition location of the element we"re reading
indexInPartition += 1
skipToNextPartition()
// If we"ve finished reading the last partition, remember that we"re done
if (partitionId == numPartitions) {
finished = true
if (deserializeStream != null) {
deserializeStream.close()
}
}
(k, c)
}

/** Construct a stream that only reads from the next batch */
def nextBatchStream(): DeserializationStream = {
// Note that batchOffsets.length = numBatches + 1 since we did a scan above; check whether
// we"re still in a valid batch.
if (batchId < batchOffsets.length - 1) {
if (deserializeStream != null) {
deserializeStream.close()
fileStream.close()
deserializeStream = null
fileStream = null
}
val start = batchOffsets(batchId)
fileStream = new FileInputStream(spill.file)
fileStream.getChannel.position(start)
batchId += 1
val end = batchOffsets(batchId)
assert(end >= start, "start = " + start + ", end = " + end +
", batchOffsets = " + batchOffsets.mkString("[", ", ", "]"))
val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start))
val sparkConf = SparkEnv.get.conf
val stream = blockManager.wrapForCompression(spill.blockId,
CryptoStreamUtils.wrapForEncryption(bufferedStream, sparkConf))
serInstance.deserializeStream(stream)
} else {
// No more batches left
cleanup()
null
}
}

end

原創作者:thinktothings

https://my.oschina.net/u/723009/blog/2988340

Spark 源碼分析之ShuffleMapTask內存數據Spill和合併

打開今日頭條,查看更多圖片
喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 程序員小新人學習 的精彩文章:

超詳細的HashMap解析(jdk1.8)
Spring Data是什麼?

TAG:程序員小新人學習 |