JobPlus知识库 IT 大数据 文章
Spark BlockManager原理与源码分析

1、BlockManager原理示意图

①Driver上的BlockManagerMaster管理各个节点上BlockManager的元数据信息和维护block的状态信息。

②每个节点上BlockManager的每个组件:

        DiskStore:负责磁盘上的数据读写

        MemoryStore: 负责内存中的数据读写

        BlockManagerWorker: 负责远程节点的数据读写

        ConnectionMaster:负责建立远程BlockManager的通信连接

③BlockManager在进行数据的读写操作时,如RDD的运行中调用了presist()或中间生成一些数据,优先存入内存,内存存储不下,就存储到磁盘中

④Shuffle的读数据操作,从本地内存(MemoryStore)和磁盘(DiskStore)中读取数据,如果没有就从其他节点上使用ConnectionMaster建立连接,使用BlockManagerWorker下载数据

2、源码分析

①BlockManager的注册与维护

BlockManagerMaster使用BlockManagerMasterEndpoint(Actor)来负责executor和BlockManager的元数据管理

BlockManagerMasterEndpoint.scala

[plain]

  1. /**  
  2.   * 负责维护各个executor和BlockManager的元数据 BlockManagerInfo、BlockStatus  
  3.   */  
  4. private[spark]  
  5. class BlockManagerMasterEndpoint(  
  6.     override val rpcEnv: RpcEnv,  
  7.     val isLocal: Boolean,  
  8.     conf: SparkConf,  
  9.     listenerBus: LiveListenerBus)  
  10.   extends ThreadSafeRpcEndpoint with Logging {  
  11.   
  12.   // Mapping from block manager id to the block manager's information.  
  13.   // BlockManagerId-BlockManagerInfo的映射  
  14.   private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]  
  15.   
  16.   // Mapping from executor ID to block manager ID.  
  17.   // executorId - blockManagerId映射 每个executor是和一个BlockManager关联的  
  18.   private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]  
  19.   
  20.   // Mapping from block id to the set of block managers that have the block.  
  21.   private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]  
  22.       ...  
  23.   }  

注册BlockManagerInfo

[plain]

  1. // 注册blockManager  
  2.   private def register(id: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef) {  
  3.     val time = System.currentTimeMillis()  
  4.   
  5.     // 判断是否注册过BlocManager  
  6.     if (!blockManagerInfo.contains(id)) {  
  7.   
  8.       // 根据executorId查找BlockManagerId  
  9.       blockManagerIdByExecutor.get(id.executorId) match {  
  10.           // 这里有一个安全判断,如果BlockManagerInfo map 中没有BlockManagerId  
  11.           // 那么对应的blockManagerIdByExecutorId map 也必须没有  
  12.         case Some(oldId) =>  
  13.           // A block manager of the same executor already exists, so remove it (assumed dead)  
  14.           logError("Got two different block manager registrations on same executor - "  
  15.               + s" will replace old one $oldId with new one $id")  
  16.   
  17.           // 所以,在这里做一下清理,移除executorId相关的BlockManagerInfo  
  18.           removeExecutor(id.executorId)  
  19.         case None =>  
  20.       }  
  21.       logInfo("Registering block manager %s with %s RAM, %s".format(  
  22.         id.hostPort, Utils.bytesToString(maxMemSize), id))  
  23.   
  24.       // 保存一份executorId到BlockManagerId的映射  
  25.       blockManagerIdByExecutor(id.executorId) = id  
  26.   
  27.       // 为BlockManagerId创建一个BlockManagerInfo  
  28.       //并保存一份BlockManagerId到BlockManagerInfo的映射  
  29.       blockManagerInfo(id) = new BlockManagerInfo(  
  30.         id, System.currentTimeMillis(), maxMemSize, slaveEndpoint)  
  31.     }  
  32.     listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))  
  33.   }  

更新BlockManagerInfo

[plain]

  1. / 更新blockInfo, 即每个BlockManager上的block发生了变化  
  2.   // 都要发送updateBlockInfo请求,到BlockManagerMaster对BlockInfo进行更新  
  3.   private def updateBlockInfo(  
  4.       blockManagerId: BlockManagerId,  
  5.       blockId: BlockId,  
  6.       storageLevel: StorageLevel,  
  7.       memSize: Long,  
  8.       diskSize: Long,  
  9.       externalBlockStoreSize: Long): Boolean = {  
  10.   
  11.     if (!blockManagerInfo.contains(blockManagerId)) {  
  12.       if (blockManagerId.isDriver && !isLocal) {  
  13.         // We intentionally do not register the master (except in local mode),  
  14.         // so we should not indicate failure.  
  15.         return true  
  16.       } else {  
  17.         return false  
  18.       }  
  19.     }  
  20.   
  21.     if (blockId == null) {  
  22.       blockManagerInfo(blockManagerId).updateLastSeenMs()  
  23.       return true  
  24.     }  
  25.   
  26.     blockManagerInfo(blockManagerId).updateBlockInfo(  
  27.       blockId, storageLevel, memSize, diskSize, externalBlockStoreSize)  
  28.   
  29.     // 每一个block可能会在多个BlockManager上  
  30.     // 根据block的存储级别StoreLevel,设置为_2的,就需要将block 备份到其他BlockManager上  
  31.     //  location map 保存了每个blockId的对应的BlockManagerId集合  
  32.     // 因为使用的是set存储,所以自动去重  
  33.     var locations: mutable.HashSet[BlockManagerId] = null  
  34.     if (blockLocations.containsKey(blockId)) {  
  35.       locations = blockLocations.get(blockId)  
  36.     } else {  
  37.       locations = new mutable.HashSet[BlockManagerId]  
  38.       blockLocations.put(blockId, locations)  
  39.     }  
  40.   
  41.     if (storageLevel.isValid) {  
  42.       locations.add(blockManagerId)  
  43.     } else {  
  44.       locations.remove(blockManagerId)  
  45.     }  
  46.   
  47.     // Remove the block from master tracking if it has been removed on all slaves.  
  48.     if (locations.size == 0) {  
  49.       blockLocations.remove(blockId)  
  50.     }  
  51.     true  
  52.   }  

[plain] 

  1. private[spark] class BlockManagerInfo(  
  2.     val blockManagerId: BlockManagerId,  
  3.     timeMs: Long,  
  4.     val maxMem: Long,  
  5.     val slaveEndpoint: RpcEndpointRef)  
  6.   extends Logging {  
  7.   
  8. ...  
  9.   // Mapping from block id to its status.  
  10.   // blockId-BlockStatus的映射  
  11.   private val _blocks = new JHashMap[BlockId, BlockStatus]  
  12.   
  13. ...  
  14. def updateBlockInfo(  
  15.       blockId: BlockId,  
  16.       storageLevel: StorageLevel,  
  17.       memSize: Long,  
  18.       diskSize: Long,  
  19.       externalBlockStoreSize: Long) {  
  20.   
  21.     updateLastSeenMs()  
  22.   
  23.     //判断内部是否有block  
  24.     if (_blocks.containsKey(blockId)) {  
  25.       // The block exists on the slave already.  
  26.       val blockStatus: BlockStatus = _blocks.get(blockId)  
  27.       val originalLevel: StorageLevel = blockStatus.storageLevel  
  28.       val originalMemSize: Long = blockStatus.memSize  
  29.   
  30.       // 判断storeLevel是否使用内存,是就给剩余内存数量加上当前内存数量  
  31.       if (originalLevel.useMemory) {  
  32.         _remainingMem += originalMemSize  
  33.       }  
  34.     }  
  35.   
  36.     // 给block创建一个BlockStatus,然后根据持久化级别,对相应的内存资源进行计算  
  37.     if (storageLevel.isValid) {  
  38.       /* isValid means it is either stored in-memory, on-disk or on-externalBlockStore.  
  39.        * The memSize here indicates the data size in or dropped from memory,  
  40.        * externalBlockStoreSize here indicates the data size in or dropped from externalBlockStore,  
  41.        * and the diskSize here indicates the data size in or dropped to disk.  
  42.        * They can be both larger than 0, when a block is dropped from memory to disk.  
  43.        * Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */  
  44.       var blockStatus: BlockStatus = null  
  45.       if (storageLevel.useMemory) {  
  46.         blockStatus = BlockStatus(storageLevel, memSize, 0, 0)  
  47.         _blocks.put(blockId, blockStatus)  
  48.         _remainingMem -= memSize  
  49.         logInfo("Added %s in memory on %s (size: %s, free: %s)".format(  
  50.           blockId, blockManagerId.hostPort, Utils.bytesToString(memSize),  
  51.           Utils.bytesToString(_remainingMem)))  
  52.       }  
  53.       if (storageLevel.useDisk) {  
  54.         blockStatus = BlockStatus(storageLevel, 0, diskSize, 0)  
  55.         _blocks.put(blockId, blockStatus)  
  56.         logInfo("Added %s on disk on %s (size: %s)".format(  
  57.           blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))  
  58.       }  
  59.       if (storageLevel.useOffHeap) {  
  60.         blockStatus = BlockStatus(storageLevel, 0, 0, externalBlockStoreSize)  
  61.         _blocks.put(blockId, blockStatus)  
  62.         logInfo("Added %s on ExternalBlockStore on %s (size: %s)".format(  
  63.           blockId, blockManagerId.hostPort, Utils.bytesToString(externalBlockStoreSize)))  
  64.       }  
  65.       if (!blockId.isBroadcast && blockStatus.isCached) {  
  66.         _cachedBlocks += blockId  
  67.       }  
  68. }  

②BlockManager和 BlockManager之间的数据传输


BlockManager.scala

初始化组件

[plain] 

  1. private[spark] class BlockManager(  
  2.     executorId: String,  
  3.     rpcEnv: RpcEnv,  
  4.     val master: BlockManagerMaster,  
  5.     defaultSerializer: Serializer,  
  6.     val conf: SparkConf,  
  7.     memoryManager: MemoryManager,  
  8.     mapOutputTracker: MapOutputTracker,  
  9.     shuffleManager: ShuffleManager,  
  10.     blockTransferService: BlockTransferService,  
  11.     securityManager: SecurityManager,  
  12.     numUsableCores: Int)  
  13.   extends BlockDataManager with Logging {  
  14.   
  15.   val diskBlockManager = new DiskBlockManager(this, conf)  
  16.   
  17.   // 每个BlockManager,自己维护了一个map  blockId-blockInfo的映射  
  18.   // blockInfo就代表着一份block,其最大作用是作为多线程访问同一个block的同步监视器  
  19.   private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]  
  20.   
  21.   private[spark] val memoryStore = new MemoryStore(this, memoryManager)  
  22.   private[spark] val diskStore = new DiskStore(this, diskBlockManager)  
  23.  def initialize(appId: String): Unit = {  
  24.   
  25.     // 初始化 ,blockTransferService用于远程block数据传输  
  26.     blockTransferService.init(this)  
  27.     shuffleClient.init(appId)  
  28.   
  29.   
  30.     // 为blockManager创建一个对应的BlockManagerId  
  31.     // 一个BlockManager是通过一个节点上的Executor来唯一标识的  
  32.     blockManagerId = BlockManagerId(  
  33.       executorId, blockTransferService.hostName, blockTransferService.port)  
  34.   
  35.     shuffleServerId = if (externalShuffleServiceEnabled) {  
  36.       logInfo(s"external shuffle service port = $externalShuffleServicePort")  
  37.       BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)  
  38.     } else {  
  39.       blockManagerId  
  40.     }  
  41.   
  42.     // 使用BlockManagerMasterEndpoint的引用,进行BlockManager的注册  
  43.     // 发送消息到BlockManagerMasterEndpoint上  
  44.     master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)  
  45.   
  46.     // Register Executors' configuration with the local shuffle service, if one should exist.  
  47.     if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {  
  48.       registerWithExternalShuffleServer()  
  49.     }  
  50.   }  
  51. ...  

   (1) 从本地读取数据

BlockManager.scala

读取数据

[plain]

  1. // 从本地获取数据  
  2.   private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {  
  3.     // 尝试获取block的对应blockInfo的锁  
  4.     val info = blockInfo.get(blockId).orNull  
  5.     if (info != null) {  
  6.       //对所有的BlockInfo,都会进行多线程同步访问  
  7.       // blockInfo相当于是对block,多线程并发访问的监视器  
  8.       info.synchronized {  
  9.         // Double check to make sure the block is still there. There is a small chance that the  
  10.         // block has been removed by removeBlock (which also synchronizes on the blockInfo object).  
  11.         // Note that this only checks metadata tracking. If user intentionally deleted the block  
  12.         // on disk or from off heap storage without using removeBlock, this conditional check will  
  13.         // still pass but eventually we will get an exception because we can't find the block.  
  14.         if (blockInfo.get(blockId).isEmpty) {  
  15.           logWarning(s"Block $blockId had been removed")  
  16.           return None  
  17.         }  
  18.   
  19.         // If another thread is writing the block, wait for it to become ready.  
  20.         // 如果其他线程在操作当前需要访问的block,就会等待获取BlockInfo的排它锁  
  21.         // 如果始终没有获取到,就返回  
  22.         if (!info.waitForReady()) {  
  23.           // If we get here, the block write failed.  
  24.           logWarning(s"Block $blockId was marked as failure.")  
  25.           return None  
  26.         }  
  27.   
  28.         val level = info.level  
  29.         logDebug(s"Level for block $blockId is $level")  
  30.   
  31.         // Look for the block in memory  
  32.         if (level.useMemory) {  
  33.           logDebug(s"Getting block $blockId from memory")  
  34.           val result = if (asBlockResult) {  
  35.             memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size))  
  36.           } else {  
  37.             memoryStore.getBytes(blockId)  
  38.           }  
  39.           result match {  
  40.             case Some(values) =>  
  41.               return result  
  42.             case None =>  
  43.               logDebug(s"Block $blockId not found in memory")  
  44.           }  
  45.         }  
  46.   
  47.         // Look for the block in external block store  
  48.         if (level.useOffHeap) {  
  49.           logDebug(s"Getting block $blockId from ExternalBlockStore")  
  50.           if (externalBlockStore.contains(blockId)) {  
  51.             val result = if (asBlockResult) {  
  52.               externalBlockStore.getValues(blockId)  
  53.                 .map(new BlockResult(_, DataReadMethod.Memory, info.size))  
  54.             } else {  
  55.               externalBlockStore.getBytes(blockId)  
  56.             }  
  57.             result match {  
  58.               case Some(values) =>  
  59.                 return result  
  60.               case None =>  
  61.                 logDebug(s"Block $blockId not found in ExternalBlockStore")  
  62.             }  
  63.           }  
  64.         }  
  65.   
  66.         // Look for block on disk, potentially storing it back in memory if required  
  67.         if (level.useDisk) {  
  68.           logDebug(s"Getting block $blockId from disk")  
  69.           val bytes: ByteBuffer = diskStore.getBytes(blockId) match {  
  70.             case Some(b) => b  
  71.             case None =>  
  72.               throw new BlockException(  
  73.                 blockId, s"Block $blockId not found on disk, though it should be")  
  74.           }  
  75.           assert(0 == bytes.position())  
  76.   
  77.           if (!level.useMemory) {  
  78.             // If the block shouldn't be stored in memory, we can just return it  
  79.             if (asBlockResult) {  
  80.               // 反序列化  
  81.               return Some(new BlockResult(dataDeserialize(blockId, bytes), DataReadMethod.Disk,  
  82.                 info.size))  
  83.             } else {  
  84.               return Some(bytes)  
  85.             }  
  86.           } else {  
  87.             // Otherwise, we also have to store something in the memory store  
  88.             if (!level.deserialized || !asBlockResult) {  
  89.               /* We'll store the bytes in memory if the block's storage level includes  
  90.                * "memory serialized", or if it should be cached as objects in memory  
  91.                * but we only requested its serialized bytes. */  
  92.               memoryStore.putBytes(blockId, bytes.limit, () => {  
  93.                 // https://issues.apache.org/jira/browse/SPARK-6076  
  94.                 // If the file size is bigger than the free memory, OOM will happen. So if we cannot  
  95.                 // put it into MemoryStore, copyForMemory should not be created. That's why this  
  96.                 // action is put into a `() => ByteBuffer` and created lazily.  
  97.   
  98.                 // 如果即使用了Disk级别,又使用了memory级别,就从disk中读取出来后,  
  99.                 // 尝试放入内存中  
  100.                 val copyForMemory = ByteBuffer.allocate(bytes.limit)  
  101.                 copyForMemory.put(bytes)  
  102.               })  
  103.               bytes.rewind()  
  104.             }  
  105.             if (!asBlockResult) {  
  106.               return Some(bytes)  
  107.             } else {  
  108.               val values = dataDeserialize(blockId, bytes)  
  109.               if (level.deserialized) {  
  110.                 // Cache the values before returning them  
  111.                 val putResult = memoryStore.putIterator(  
  112.                   blockId, values, level, returnValues = true, allowPersistToDisk = false)  
  113.                 // The put may or may not have succeeded, depending on whether there was enough  
  114.                 // space to unroll the block. Either way, the put here should return an iterator.  
  115.                 putResult.data match {  
  116.                   case Left(it) =>  
  117.                     return Some(new BlockResult(it, DataReadMethod.Disk, info.size))  
  118.                   case _ =>  
  119.                     // This only happens if we dropped the values back to disk (which is never)  
  120.                     throw new SparkException("Memory store did not return an iterator!")  
  121.                 }  
  122.               } else {  
  123.                 return Some(new BlockResult(values, DataReadMethod.Disk, info.size))  
  124.               }  
  125.             }  
  126.           }  
  127.         }  
  128.       }  
  129.     } else {  
  130.       logDebug(s"Block $blockId not registered locally")  
  131.     }  
  132.     None  
  133.   }  

MemoryStore.scala

[plain] 

  1. override def getBytes(blockId: BlockId): Option[ByteBuffer] = {  
  2.     // 多线程并发访问同步的  
  3.     val entry = entries.synchronized {  
  4.       // 尝试从内存中获取数据  
  5.       entries.get(blockId)  
  6.     }  
  7.   
  8.     if (entry == null) {  
  9.       // 没有获取到就返回null  
  10.       None  
  11.     } else if (entry.deserialized) {  
  12.       // 获取到的是非序列化数据,将其序列化后返回  
  13.       Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[Array[Any]].iterator))  
  14.     } else {  
  15.       // 序列化数据,直接返回  
  16.       Some(entry.value.asInstanceOf[ByteBuffer].duplicate()) // Doesn't actually copy the data  
  17.     }  
  18.   }  

getValues()方法与getBytes()方法相反,需要拿到的是文本数据

[plain] 

  1. override def getValues(blockId: BlockId): Option[Iterator[Any]] = {  
  2.     val entry = entries.synchronized {  
  3.       entries.get(blockId)  
  4.     }  
  5.     if (entry == null) {  
  6.       None  
  7.     } else if (entry.deserialized) {  
  8.       Some(entry.value.asInstanceOf[Array[Any]].iterator)  
  9.     } else {  
  10.       // 序列化的数据,反序列化  
  11.       val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() // Doesn't actually copy data  
  12.       Some(blockManager.dataDeserialize(blockId, buffer))  
  13.     }  
  14.   }  

DiskStore.scala

[plain] 

  1. private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = {  
  2.     // 使用java的 nio进行文件的读写操作  
  3.     val channel = new RandomAccessFile(file, "r").getChannel  
  4.     Utils.tryWithSafeFinally {  
  5.       // For small files, directly read rather than memory map  
  6.       if (length < minMemoryMapBytes) {  
  7.         val buf = ByteBuffer.allocate(length.toInt)  
  8.         channel.position(offset)  
  9.         while (buf.remaining() != 0) {  
  10.           if (channel.read(buf) == -1) {  
  11.             throw new IOException("Reached EOF before filling buffer\n" +  
  12.               s"offset=$offset\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")  
  13.           }  
  14.         }  
  15.         buf.flip()  
  16.         Some(buf)  
  17.       } else {  
  18.         Some(channel.map(MapMode.READ_ONLY, offset, length))  
  19.       }  
  20.     } {  
  21.       channel.close()  
  22.     }  
  23.   }  

[plain]

  1. override def getValues(blockId: BlockId): Option[Iterator[Any]] = {  
  2.    getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))  
  3.  }  

总结:

    ①先从内存中读取数据,再从磁盘中读取

    ②如果读取的数据使用了内存,又使用了磁盘,将从磁盘中读取的数据写入到内存

    ③数据的读取过程使用了多线程同步访问,保证数据读取的安全

(2)远程读取数据

BlockManager.scala

[plain] 

  1. private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {  
  2.     require(blockId != null, "BlockId is null")  
  3.   
  4.     // 从BlockManagerMaster上,获取blockId对应的BlockManager信息  
  5.     // 然后随机打乱  
  6.     val locations = Random.shuffle(master.getLocations(blockId))  
  7.     var numFetchFailures = 0  
  8.   
  9.     // 遍历BlockManager  
  10.     for (loc <- locations) {  
  11.       logDebug(s"Getting remote block $blockId from $loc")  
  12.   
  13.       // 使用blockTransferService,进行异步的远程网络获取block数据  
  14.       val data = try {  
  15.         blockTransferService.fetchBlockSync(  
  16.           loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()  
  17.       } catch {  
  18.         case NonFatal(e) =>  
  19.           numFetchFailures += 1  
  20.           if (numFetchFailures == locations.size) {  
  21.             // An exception is thrown while fetching this block from all locations  
  22.             throw new BlockFetchException(s"Failed to fetch block from" +  
  23.               s" ${locations.size} locations. Most recent failure cause:", e)  
  24.           } else {  
  25.             // This location failed, so we retry fetch from a different one by returning null here  
  26.             logWarning(s"Failed to fetch remote block $blockId " +  
  27.               s"from $loc (failed attempt $numFetchFailures)", e)  
  28.             null  
  29.           }  
  30.       }  
  31.   
  32.       if (data != null) {  
  33.         if (asBlockResult) {  
  34.           return Some(new BlockResult(  
  35.             // 反序列化  
  36.             dataDeserialize(blockId, data),  
  37.             DataReadMethod.Network,  
  38.             data.limit()))  
  39.         } else {  
  40.           return Some(data)  
  41.         }  
  42.       }  
  43.       logDebug(s"The value of block $blockId is null")  
  44.     }  
  45.     logDebug(s"Block $blockId not found")  
  46.     None  
  47.   }  

(3) 写数据

BlockManager.scala

[plain]

  1. private def doPut(  
  2.       blockId: BlockId,  
  3.       data: BlockValues,  
  4.       level: StorageLevel,  
  5.       tellMaster: Boolean = true,  
  6.       effectiveStorageLevel: Option[StorageLevel] = None)  
  7.     : Seq[(BlockId, BlockStatus)] = {  
  8.   
  9.     require(blockId != null, "BlockId is null")  
  10.     require(level != null && level.isValid, "StorageLevel is null or invalid")  
  11.     effectiveStorageLevel.foreach { level =>  
  12.       require(level != null && level.isValid, "Effective StorageLevel is null or invalid")  
  13.     }  
  14.   
  15.     // Return value  
  16.     val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]  
  17.   
  18.     /* Remember the block's storage level so that we can correctly drop it to disk if it needs  
  19.      * to be dropped right after it got put into memory. Note, however, that other threads will  
  20.      * not be able to get() this block until we call markReady on its BlockInfo. */  
  21.     // 为要写入的block,创建一个BlockInfo,并放入BlockInfo map中  
  22.     val putBlockInfo = {  
  23.       val tinfo = new BlockInfo(level, tellMaster)  
  24.       // Do atomically !  
  25.       val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)  
  26.       if (oldBlockOpt.isDefined) {  
  27.         if (oldBlockOpt.get.waitForReady()) {  
  28.           logWarning(s"Block $blockId already exists on this machine; not re-adding it")  
  29.           return updatedBlocks  
  30.         }  
  31.         // TODO: So the block info exists - but previous attempt to load it (?) failed.  
  32.         // What do we do now ? Retry on it ?  
  33.         oldBlockOpt.get  
  34.       } else {  
  35.         tinfo  
  36.       }  
  37.     }  
  38.   
  39.     val startTimeMs = System.currentTimeMillis  
  40.   
  41.     /* If we're storing values and we need to replicate the data, we'll want access to the values,  
  42.      * but because our put will read the whole iterator, there will be no values left. For the  
  43.      * case where the put serializes data, we'll remember the bytes, above; but for the case where  
  44.      * it doesn't, such as deserialized storage, let's rely on the put returning an Iterator. */  
  45.     var valuesAfterPut: Iterator[Any] = null  
  46.   
  47.     // Ditto for the bytes after the put  
  48.     var bytesAfterPut: ByteBuffer = null  
  49.   
  50.     // Size of the block in bytes  
  51.     var size = 0L  
  52.   
  53.     // The level we actually use to put the block  
  54.     val putLevel = effectiveStorageLevel.getOrElse(level)  
  55.   
  56.     // If we're storing bytes, then initiate the replication before storing them locally.  
  57.     // This is faster as data is already serialized and ready to send.  
  58.     val replicationFuture = data match {  
  59.       case b: ByteBufferValues if putLevel.replication > 1 =>  
  60.         // Duplicate doesn't copy the bytes, but just creates a wrapper  
  61.         val bufferView = b.buffer.duplicate()  
  62.         Future {  
  63.           // This is a blocking action and should run in futureExecutionContext which is a cached  
  64.           // thread pool  
  65.           replicate(blockId, bufferView, putLevel)  
  66.         }(futureExecutionContext)  
  67.       case _ => null  
  68.     }  
  69.   
  70.     // 对BlockInfo加锁,进行多线程并发访问同步  
  71.     putBlockInfo.synchronized {  
  72.       logTrace("Put for block %s took %s to get into synchronized block"  
  73.         .format(blockId, Utils.getUsedTimeMs(startTimeMs)))  
  74.   
  75.       var marked = false  
  76.       try {  
  77.         // returnValues - Whether to return the values put  
  78.         // blockStore - The type of storage to put these values into  
  79.         // 根据持久化级别,选择一种BlockStore, MemoryStore, DiskStore等  
  80.         val (returnValues, blockStore: BlockStore) = {  
  81.           if (putLevel.useMemory) {  
  82.             // Put it in memory first, even if it also has useDisk set to true;  
  83.             // We will drop it to disk later if the memory store can't hold it.  
  84.             (true, memoryStore)  
  85.           } else if (putLevel.useOffHeap) {  
  86.             // Use external block store  
  87.             (false, externalBlockStore)  
  88.           } else if (putLevel.useDisk) {  
  89.             // Don't get back the bytes from put unless we replicate them  
  90.             (putLevel.replication > 1, diskStore)  
  91.           } else {  
  92.             assert(putLevel == StorageLevel.NONE)  
  93.             throw new BlockException(  
  94.               blockId, s"Attempted to put block $blockId without specifying storage level!")  
  95.           }  
  96.         }  
  97.   
  98.         // Actually put the values  
  99.         // 根据store级别,数据的类型,把数据放入store中  
  100.         val result = data match {  
  101.           case IteratorValues(iterator) =>  
  102.             blockStore.putIterator(blockId, iterator, putLevel, returnValues)  
  103.           case ArrayValues(array) =>  
  104.             blockStore.putArray(blockId, array, putLevel, returnValues)  
  105.           case ByteBufferValues(bytes) =>  
  106.             bytes.rewind()  
  107.             blockStore.putBytes(blockId, bytes, putLevel)  
  108.         }  
  109.         size = result.size  
  110.         result.data match {  
  111.           case Left (newIterator) if putLevel.useMemory => valuesAfterPut = newIterator  
  112.           case Right (newBytes) => bytesAfterPut = newBytes  
  113.           case _ =>  
  114.         }  
  115.   
  116.         // Keep track of which blocks are dropped from memory  
  117.         if (putLevel.useMemory) {  
  118.           result.droppedBlocks.foreach { updatedBlocks += _ }  
  119.         }  
  120.   
  121.         // 获取到一个Block对应的BlockStatus  
  122.         val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)  
  123.         if (putBlockStatus.storageLevel != StorageLevel.NONE) {  
  124.           // Now that the block is in either the memory, externalBlockStore, or disk store,  
  125.           // let other threads read it, and tell the master about it.  
  126.           marked = true  
  127.           putBlockInfo.markReady(size)  
  128.           if (tellMaster) {  
  129.             // 将新写入的block数据,发送给BlockManagerMasterEndpoint  
  130.             // 进行block元数据的同步和维护  
  131.             reportBlockStatus(blockId, putBlockInfo, putBlockStatus)  
  132.           }  
  133.           updatedBlocks += ((blockId, putBlockStatus))  
  134.         }  
  135.       } finally {  
  136.         // If we failed in putting the block to memory/disk, notify other possible readers  
  137.         // that it has failed, and then remove it from the block info map.  
  138.         if (!marked) {  
  139.           // Note that the remove must happen before markFailure otherwise another thread  
  140.           // could've inserted a new BlockInfo before we remove it.  
  141.           blockInfo.remove(blockId)  
  142.           putBlockInfo.markFailure()  
  143.           logWarning(s"Putting block $blockId failed")  
  144.         }  
  145.       }  
  146.     }  
  147.     logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))  
  148.   
  149.     // Either we're storing bytes and we asynchronously started replication, or we're storing  
  150.     // values and need to serialize and replicate them now:  
  151.   
  152.     // 持久化级别定义了 _2 级别,需要将block数据,备份到其他节点  
  153.     if (putLevel.replication > 1) {  
  154.       data match {  
  155.         case ByteBufferValues(bytes) =>  
  156.           if (replicationFuture != null) {  
  157.             Await.ready(replicationFuture, Duration.Inf)  
  158.           }  
  159.         case _ =>  
  160.           val remoteStartTime = System.currentTimeMillis  
  161.           // Serialize the block if not already done  
  162.           if (bytesAfterPut == null) {  
  163.             if (valuesAfterPut == null) {  
  164.               throw new SparkException(  
  165.                 "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")  
  166.             }  
  167.             bytesAfterPut = dataSerialize(blockId, valuesAfterPut)  
  168.           }  
  169.   
  170.           // 进行数据备份  
  171.           replicate(blockId, bytesAfterPut, putLevel)  
  172.           logDebug("Put block %s remotely took %s"  
  173.             .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))  
  174.       }  
  175.     }  
  176.   
  177.     BlockManager.dispose(bytesAfterPut)  
  178.   
  179.     if (putLevel.replication > 1) {  
  180.       logDebug("Putting block %s with replication took %s"  
  181.         .format(blockId, Utils.getUsedTimeMs(startTimeMs)))  
  182.     } else {  
  183.       logDebug("Putting block %s without replication took %s"  
  184.         .format(blockId, Utils.getUsedTimeMs(startTimeMs)))  
  185.     }  
  186.   
  187.     updatedBlocks  
  188.   }  

MemoryStore.scala

[plain]

  1. // 优先写入内存,如果内存不足,从内存中移除部分旧数据,再将block存入内存  
  2.   private def tryToPut(  
  3.       blockId: BlockId,  
  4.       value: () => Any,  
  5.       size: Long,  
  6.       deserialized: Boolean,  
  7.       droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {  
  8.   
  9.     /* TODO: Its possible to optimize the locking by locking entries only when selecting blocks  
  10.      * to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has  
  11.      * been released, it must be ensured that those to-be-dropped blocks are not double counted  
  12.      * for freeing up more space for another block that needs to be put. Only then the actually  
  13.      * dropping of blocks (and writing to disk if necessary) can proceed in parallel. */  
  14.   
  15.     memoryManager.synchronized {  
  16.       // Note: if we have previously unrolled this block successfully, then pending unroll  
  17.       // memory should be non-zero. This is the amount that we already reserved during the  
  18.       // unrolling process. In this case, we can just reuse this space to cache our block.  
  19.       // The synchronization on `memoryManager` here guarantees that the release and acquire  
  20.       // happen atomically. This relies on the assumption that all memory acquisitions are  
  21.       // synchronized on the same lock.  
  22.   
  23.   
  24.       releasePendingUnrollMemoryForThisTask()  
  25.   
  26.       //判断内存是否存够放入数据  
  27.       val enoughMemory = memoryManager.acquireStorageMemory(blockId, size, droppedBlocks)  
  28.   
  29.       if (enoughMemory) {  
  30.         // 内存足够  
  31.         // We acquired enough memory for the block, so go ahead and put it  
  32.         val entry = new MemoryEntry(value(), size, deserialized)  
  33.   
  34.         // 同步entries  
  35.         entries.synchronized {  
  36.           entries.put(blockId, entry)  
  37.         }  
  38.         val valuesOrBytes = if (deserialized) "values" else "bytes"  
  39.         logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(  
  40.           blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed)))  
  41.       } else {  
  42.         // Tell the block manager that we couldn't put it in memory so that it can drop it to  
  43.         // disk if the block allows disk storage.  
  44.         lazy val data = if (deserialized) {  
  45.           Left(value().asInstanceOf[Array[Any]])  
  46.         } else {  
  47.           Right(value().asInstanceOf[ByteBuffer].duplicate())  
  48.         }  
  49.   
  50.         // 如果block允许磁盘存储,就从BlockManager溢出一部分数据,如果不允许持久化到磁盘,数据就丢失了  
  51.         val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data)  
  52.         droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }  
  53.       }  
  54.       enoughMemory  
  55.     }  
  56.   }  

DiskStore.scala

[plain] 

  1. override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = {  
  2.   // So that we do not modify the input offsets !  
  3.   // duplicate does not copy buffer, so inexpensive  
  4.     
  5.   val bytes = _bytes.duplicate()  
  6.   logDebug(s"Attempting to put block $blockId")  
  7.   val startTime = System.currentTimeMillis  
  8.   val file = diskManager.getFile(blockId)  
  9.   val channel = new FileOutputStream(file).getChannel  
  10.   Utils.tryWithSafeFinally {  
  11.     while (bytes.remaining > 0) {  
  12.       channel.write(bytes)  
  13.     }  
  14.   } {  
  15.     channel.close()  
  16.   }  
  17.   val finishTime = System.currentTimeMillis  
  18.   logDebug("Block %s stored as %s file on disk in %d ms".format(  
  19.     file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime))  
  20.   PutResult(bytes.limit(), Right(bytes.duplicate()))  
  21. }  

数据在其他节点的BlockManager上备份

BlockManager.scala

[plain] 

  1. private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = {  
  2.     val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)  
  3.     val numPeersToReplicateTo = level.replication - 1  
  4.     val peersForReplication = new ArrayBuffer[BlockManagerId]  
  5.     val peersReplicatedTo = new ArrayBuffer[BlockManagerId]  
  6.     val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]  
  7.     val tLevel = StorageLevel(  
  8.       level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)  
  9.     val startTime = System.currentTimeMillis  
  10.     val random = new Random(blockId.hashCode)  
  11.   
  12.     var replicationFailed = false  
  13.     var failures = 0  
  14.     var done = false  
  15.   
  16.     // Get cached list of peers  
  17.     peersForReplication ++= getPeers(forceFetch = false)  
  18.   
  19.     // Get a random peer. Note that this selection of a peer is deterministic on the block id.  
  20.     // So assuming the list of peers does not change and no replication failures,  
  21.     // if there are multiple attempts in the same node to replicate the same block,  
  22.     // the same set of peers will be selected.  
  23.   
  24.     // 随机获取一个BlockManager  
  25.     def getRandomPeer(): Option[BlockManagerId] = {  
  26.       // If replication had failed, then force update the cached list of peers and remove the peers  
  27.       // that have been already used  
  28.       if (replicationFailed) {  
  29.         peersForReplication.clear()  
  30.         peersForReplication ++= getPeers(forceFetch = true)  
  31.         peersForReplication --= peersReplicatedTo  
  32.         peersForReplication --= peersFailedToReplicateTo  
  33.       }  
  34.       if (!peersForReplication.isEmpty) {  
  35.         Some(peersForReplication(random.nextInt(peersForReplication.size)))  
  36.       } else {  
  37.         None  
  38.       }  
  39.     }  
  40.   
  41.     // One by one choose a random peer and try uploading the block to it  
  42.     // If replication fails (e.g., target peer is down), force the list of cached peers  
  43.     // to be re-fetched from driver and then pick another random peer for replication. Also  
  44.     // temporarily black list the peer for which replication failed.  
  45.     //  
  46.     // This selection of a peer and replication is continued in a loop until one of the  
  47.     // following 3 conditions is fulfilled:  
  48.     // (i) specified number of peers have been replicated to  
  49.     // (ii) too many failures in replicating to peers  
  50.     // (iii) no peer left to replicate to  
  51.     //  
  52.     while (!done) {  
  53.       getRandomPeer() match {  
  54.         case Some(peer) =>  
  55.           try {  
  56.             val onePeerStartTime = System.currentTimeMillis  
  57.             data.rewind()  
  58.             logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer")  
  59.   
  60.             // 使用blockTransferService异步将数据写入其他的BlockManager上  
  61.             blockTransferService.uploadBlockSync(  
  62.               peer.host, peer.port, peer.executorId, blockId, new NioManagedBuffer(data), tLevel)  
  63.             logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %s ms"  
  64.               .format(System.currentTimeMillis - onePeerStartTime))  
  65.             peersReplicatedTo += peer  
  66.             peersForReplication -= peer  
  67.             replicationFailed = false  
  68.             if (peersReplicatedTo.size == numPeersToReplicateTo) {  
  69.               done = true  // specified number of peers have been replicated to  
  70.             }  
  71.           } catch {  
  72.             case e: Exception =>  
  73.               logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)  
  74.               failures += 1  
  75.               replicationFailed = true  
  76.               peersFailedToReplicateTo += peer  
  77.               if (failures > maxReplicationFailures) { // too many failures in replcating to peers  
  78.                 done = true  
  79.               }  
  80.           }  
  81.         case None => // no peer left to replicate to  
  82.           done = true  
  83.       }  
  84.     }  
  85.     val timeTakeMs = (System.currentTimeMillis - startTime)  
  86.     logDebug(s"Replicating $blockId of ${data.limit()} bytes to " +  
  87.       s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms")  
  88.     if (peersReplicatedTo.size < numPeersToReplicateTo) {  
  89.       logWarning(s"Block $blockId replicated to only " +  
  90.         s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers")  
  91.     }  
  92.   }  

总结:

①写内存不足的处理机制
②写完以后汇报BlockManagerMaster
③如果要备份随机挑选一个BlocKManager,使用blockTransformInterface将数据传输过去


如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!

¥ 打赏支持
209人赞 举报
分享到
用户评价(0)

暂无评价,你也可以发布评价哦:)

扫码APP

扫描使用APP

扫码使用

扫描使用小程序