1、BlockManager原理示意图
①Driver上的BlockManagerMaster管理各个节点上BlockManager的元数据信息和维护block的状态信息。
②每个节点上BlockManager的每个组件:
DiskStore:负责磁盘上的数据读写
MemoryStore: 负责内存中的数据读写
BlockManagerWorker: 负责远程节点的数据读写
ConnectionMaster:负责建立远程BlockManager的通信连接
③BlockManager在进行数据的读写操作时,如RDD的运行中调用了presist()或中间生成一些数据,优先存入内存,内存存储不下,就存储到磁盘中
④Shuffle的读数据操作,从本地内存(MemoryStore)和磁盘(DiskStore)中读取数据,如果没有就从其他节点上使用ConnectionMaster建立连接,使用BlockManagerWorker下载数据
2、源码分析
①BlockManager的注册与维护
BlockManagerMaster使用BlockManagerMasterEndpoint(Actor)来负责executor和BlockManager的元数据管理
BlockManagerMasterEndpoint.scala
[plain]
- /**
- * 负责维护各个executor和BlockManager的元数据 BlockManagerInfo、BlockStatus
- */
- private[spark]
- class BlockManagerMasterEndpoint(
- override val rpcEnv: RpcEnv,
- val isLocal: Boolean,
- conf: SparkConf,
- listenerBus: LiveListenerBus)
- extends ThreadSafeRpcEndpoint with Logging {
- // Mapping from block manager id to the block manager's information.
- // BlockManagerId-BlockManagerInfo的映射
- private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]
- // Mapping from executor ID to block manager ID.
- // executorId - blockManagerId映射 每个executor是和一个BlockManager关联的
- private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]
- // Mapping from block id to the set of block managers that have the block.
- private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
- ...
- }
注册BlockManagerInfo
[plain]
- // 注册blockManager
- private def register(id: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef) {
- val time = System.currentTimeMillis()
- // 判断是否注册过BlocManager
- if (!blockManagerInfo.contains(id)) {
- // 根据executorId查找BlockManagerId
- blockManagerIdByExecutor.get(id.executorId) match {
- // 这里有一个安全判断,如果BlockManagerInfo map 中没有BlockManagerId
- // 那么对应的blockManagerIdByExecutorId map 也必须没有
- case Some(oldId) =>
- // A block manager of the same executor already exists, so remove it (assumed dead)
- logError("Got two different block manager registrations on same executor - "
- + s" will replace old one $oldId with new one $id")
- // 所以,在这里做一下清理,移除executorId相关的BlockManagerInfo
- removeExecutor(id.executorId)
- case None =>
- }
- logInfo("Registering block manager %s with %s RAM, %s".format(
- id.hostPort, Utils.bytesToString(maxMemSize), id))
- // 保存一份executorId到BlockManagerId的映射
- blockManagerIdByExecutor(id.executorId) = id
- // 为BlockManagerId创建一个BlockManagerInfo
- //并保存一份BlockManagerId到BlockManagerInfo的映射
- blockManagerInfo(id) = new BlockManagerInfo(
- id, System.currentTimeMillis(), maxMemSize, slaveEndpoint)
- }
- listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
- }
更新BlockManagerInfo
[plain]
- / 更新blockInfo, 即每个BlockManager上的block发生了变化
- // 都要发送updateBlockInfo请求,到BlockManagerMaster对BlockInfo进行更新
- private def updateBlockInfo(
- blockManagerId: BlockManagerId,
- blockId: BlockId,
- storageLevel: StorageLevel,
- memSize: Long,
- diskSize: Long,
- externalBlockStoreSize: Long): Boolean = {
- if (!blockManagerInfo.contains(blockManagerId)) {
- if (blockManagerId.isDriver && !isLocal) {
- // We intentionally do not register the master (except in local mode),
- // so we should not indicate failure.
- return true
- } else {
- return false
- }
- }
- if (blockId == null) {
- blockManagerInfo(blockManagerId).updateLastSeenMs()
- return true
- }
- blockManagerInfo(blockManagerId).updateBlockInfo(
- blockId, storageLevel, memSize, diskSize, externalBlockStoreSize)
- // 每一个block可能会在多个BlockManager上
- // 根据block的存储级别StoreLevel,设置为_2的,就需要将block 备份到其他BlockManager上
- // location map 保存了每个blockId的对应的BlockManagerId集合
- // 因为使用的是set存储,所以自动去重
- var locations: mutable.HashSet[BlockManagerId] = null
- if (blockLocations.containsKey(blockId)) {
- locations = blockLocations.get(blockId)
- } else {
- locations = new mutable.HashSet[BlockManagerId]
- blockLocations.put(blockId, locations)
- }
- if (storageLevel.isValid) {
- locations.add(blockManagerId)
- } else {
- locations.remove(blockManagerId)
- }
- // Remove the block from master tracking if it has been removed on all slaves.
- if (locations.size == 0) {
- blockLocations.remove(blockId)
- }
- true
- }
[plain]
- private[spark] class BlockManagerInfo(
- val blockManagerId: BlockManagerId,
- timeMs: Long,
- val maxMem: Long,
- val slaveEndpoint: RpcEndpointRef)
- extends Logging {
- ...
- // Mapping from block id to its status.
- // blockId-BlockStatus的映射
- private val _blocks = new JHashMap[BlockId, BlockStatus]
- ...
- def updateBlockInfo(
- blockId: BlockId,
- storageLevel: StorageLevel,
- memSize: Long,
- diskSize: Long,
- externalBlockStoreSize: Long) {
- updateLastSeenMs()
- //判断内部是否有block
- if (_blocks.containsKey(blockId)) {
- // The block exists on the slave already.
- val blockStatus: BlockStatus = _blocks.get(blockId)
- val originalLevel: StorageLevel = blockStatus.storageLevel
- val originalMemSize: Long = blockStatus.memSize
- // 判断storeLevel是否使用内存,是就给剩余内存数量加上当前内存数量
- if (originalLevel.useMemory) {
- _remainingMem += originalMemSize
- }
- }
- // 给block创建一个BlockStatus,然后根据持久化级别,对相应的内存资源进行计算
- if (storageLevel.isValid) {
- /* isValid means it is either stored in-memory, on-disk or on-externalBlockStore.
- * The memSize here indicates the data size in or dropped from memory,
- * externalBlockStoreSize here indicates the data size in or dropped from externalBlockStore,
- * and the diskSize here indicates the data size in or dropped to disk.
- * They can be both larger than 0, when a block is dropped from memory to disk.
- * Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */
- var blockStatus: BlockStatus = null
- if (storageLevel.useMemory) {
- blockStatus = BlockStatus(storageLevel, memSize, 0, 0)
- _blocks.put(blockId, blockStatus)
- _remainingMem -= memSize
- logInfo("Added %s in memory on %s (size: %s, free: %s)".format(
- blockId, blockManagerId.hostPort, Utils.bytesToString(memSize),
- Utils.bytesToString(_remainingMem)))
- }
- if (storageLevel.useDisk) {
- blockStatus = BlockStatus(storageLevel, 0, diskSize, 0)
- _blocks.put(blockId, blockStatus)
- logInfo("Added %s on disk on %s (size: %s)".format(
- blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
- }
- if (storageLevel.useOffHeap) {
- blockStatus = BlockStatus(storageLevel, 0, 0, externalBlockStoreSize)
- _blocks.put(blockId, blockStatus)
- logInfo("Added %s on ExternalBlockStore on %s (size: %s)".format(
- blockId, blockManagerId.hostPort, Utils.bytesToString(externalBlockStoreSize)))
- }
- if (!blockId.isBroadcast && blockStatus.isCached) {
- _cachedBlocks += blockId
- }
- }
②BlockManager和 BlockManager之间的数据传输
BlockManager.scala
初始化组件
[plain]
- private[spark] class BlockManager(
- executorId: String,
- rpcEnv: RpcEnv,
- val master: BlockManagerMaster,
- defaultSerializer: Serializer,
- val conf: SparkConf,
- memoryManager: MemoryManager,
- mapOutputTracker: MapOutputTracker,
- shuffleManager: ShuffleManager,
- blockTransferService: BlockTransferService,
- securityManager: SecurityManager,
- numUsableCores: Int)
- extends BlockDataManager with Logging {
- val diskBlockManager = new DiskBlockManager(this, conf)
- // 每个BlockManager,自己维护了一个map blockId-blockInfo的映射
- // blockInfo就代表着一份block,其最大作用是作为多线程访问同一个block的同步监视器
- private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]
- private[spark] val memoryStore = new MemoryStore(this, memoryManager)
- private[spark] val diskStore = new DiskStore(this, diskBlockManager)
- def initialize(appId: String): Unit = {
- // 初始化 ,blockTransferService用于远程block数据传输
- blockTransferService.init(this)
- shuffleClient.init(appId)
- // 为blockManager创建一个对应的BlockManagerId
- // 一个BlockManager是通过一个节点上的Executor来唯一标识的
- blockManagerId = BlockManagerId(
- executorId, blockTransferService.hostName, blockTransferService.port)
- shuffleServerId = if (externalShuffleServiceEnabled) {
- logInfo(s"external shuffle service port = $externalShuffleServicePort")
- BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
- } else {
- blockManagerId
- }
- // 使用BlockManagerMasterEndpoint的引用,进行BlockManager的注册
- // 发送消息到BlockManagerMasterEndpoint上
- master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)
- // Register Executors' configuration with the local shuffle service, if one should exist.
- if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
- registerWithExternalShuffleServer()
- }
- }
- ...
(1) 从本地读取数据
BlockManager.scala
读取数据
[plain]
- // 从本地获取数据
- private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
- // 尝试获取block的对应blockInfo的锁
- val info = blockInfo.get(blockId).orNull
- if (info != null) {
- //对所有的BlockInfo,都会进行多线程同步访问
- // blockInfo相当于是对block,多线程并发访问的监视器
- info.synchronized {
- // Double check to make sure the block is still there. There is a small chance that the
- // block has been removed by removeBlock (which also synchronizes on the blockInfo object).
- // Note that this only checks metadata tracking. If user intentionally deleted the block
- // on disk or from off heap storage without using removeBlock, this conditional check will
- // still pass but eventually we will get an exception because we can't find the block.
- if (blockInfo.get(blockId).isEmpty) {
- logWarning(s"Block $blockId had been removed")
- return None
- }
- // If another thread is writing the block, wait for it to become ready.
- // 如果其他线程在操作当前需要访问的block,就会等待获取BlockInfo的排它锁
- // 如果始终没有获取到,就返回
- if (!info.waitForReady()) {
- // If we get here, the block write failed.
- logWarning(s"Block $blockId was marked as failure.")
- return None
- }
- val level = info.level
- logDebug(s"Level for block $blockId is $level")
- // Look for the block in memory
- if (level.useMemory) {
- logDebug(s"Getting block $blockId from memory")
- val result = if (asBlockResult) {
- memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size))
- } else {
- memoryStore.getBytes(blockId)
- }
- result match {
- case Some(values) =>
- return result
- case None =>
- logDebug(s"Block $blockId not found in memory")
- }
- }
- // Look for the block in external block store
- if (level.useOffHeap) {
- logDebug(s"Getting block $blockId from ExternalBlockStore")
- if (externalBlockStore.contains(blockId)) {
- val result = if (asBlockResult) {
- externalBlockStore.getValues(blockId)
- .map(new BlockResult(_, DataReadMethod.Memory, info.size))
- } else {
- externalBlockStore.getBytes(blockId)
- }
- result match {
- case Some(values) =>
- return result
- case None =>
- logDebug(s"Block $blockId not found in ExternalBlockStore")
- }
- }
- }
- // Look for block on disk, potentially storing it back in memory if required
- if (level.useDisk) {
- logDebug(s"Getting block $blockId from disk")
- val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
- case Some(b) => b
- case None =>
- throw new BlockException(
- blockId, s"Block $blockId not found on disk, though it should be")
- }
- assert(0 == bytes.position())
- if (!level.useMemory) {
- // If the block shouldn't be stored in memory, we can just return it
- if (asBlockResult) {
- // 反序列化
- return Some(new BlockResult(dataDeserialize(blockId, bytes), DataReadMethod.Disk,
- info.size))
- } else {
- return Some(bytes)
- }
- } else {
- // Otherwise, we also have to store something in the memory store
- if (!level.deserialized || !asBlockResult) {
- /* We'll store the bytes in memory if the block's storage level includes
- * "memory serialized", or if it should be cached as objects in memory
- * but we only requested its serialized bytes. */
- memoryStore.putBytes(blockId, bytes.limit, () => {
- // https://issues.apache.org/jira/browse/SPARK-6076
- // If the file size is bigger than the free memory, OOM will happen. So if we cannot
- // put it into MemoryStore, copyForMemory should not be created. That's why this
- // action is put into a `() => ByteBuffer` and created lazily.
- // 如果即使用了Disk级别,又使用了memory级别,就从disk中读取出来后,
- // 尝试放入内存中
- val copyForMemory = ByteBuffer.allocate(bytes.limit)
- copyForMemory.put(bytes)
- })
- bytes.rewind()
- }
- if (!asBlockResult) {
- return Some(bytes)
- } else {
- val values = dataDeserialize(blockId, bytes)
- if (level.deserialized) {
- // Cache the values before returning them
- val putResult = memoryStore.putIterator(
- blockId, values, level, returnValues = true, allowPersistToDisk = false)
- // The put may or may not have succeeded, depending on whether there was enough
- // space to unroll the block. Either way, the put here should return an iterator.
- putResult.data match {
- case Left(it) =>
- return Some(new BlockResult(it, DataReadMethod.Disk, info.size))
- case _ =>
- // This only happens if we dropped the values back to disk (which is never)
- throw new SparkException("Memory store did not return an iterator!")
- }
- } else {
- return Some(new BlockResult(values, DataReadMethod.Disk, info.size))
- }
- }
- }
- }
- }
- } else {
- logDebug(s"Block $blockId not registered locally")
- }
- None
- }
MemoryStore.scala
[plain]
- override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
- // 多线程并发访问同步的
- val entry = entries.synchronized {
- // 尝试从内存中获取数据
- entries.get(blockId)
- }
- if (entry == null) {
- // 没有获取到就返回null
- None
- } else if (entry.deserialized) {
- // 获取到的是非序列化数据,将其序列化后返回
- Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[Array[Any]].iterator))
- } else {
- // 序列化数据,直接返回
- Some(entry.value.asInstanceOf[ByteBuffer].duplicate()) // Doesn't actually copy the data
- }
- }
getValues()方法与getBytes()方法相反,需要拿到的是文本数据
[plain]
- override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
- val entry = entries.synchronized {
- entries.get(blockId)
- }
- if (entry == null) {
- None
- } else if (entry.deserialized) {
- Some(entry.value.asInstanceOf[Array[Any]].iterator)
- } else {
- // 序列化的数据,反序列化
- val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() // Doesn't actually copy data
- Some(blockManager.dataDeserialize(blockId, buffer))
- }
- }
DiskStore.scala
[plain]
- private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = {
- // 使用java的 nio进行文件的读写操作
- val channel = new RandomAccessFile(file, "r").getChannel
- Utils.tryWithSafeFinally {
- // For small files, directly read rather than memory map
- if (length < minMemoryMapBytes) {
- val buf = ByteBuffer.allocate(length.toInt)
- channel.position(offset)
- while (buf.remaining() != 0) {
- if (channel.read(buf) == -1) {
- throw new IOException("Reached EOF before filling buffer\n" +
- s"offset=$offset\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
- }
- }
- buf.flip()
- Some(buf)
- } else {
- Some(channel.map(MapMode.READ_ONLY, offset, length))
- }
- } {
- channel.close()
- }
- }
[plain]
- override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
- getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
- }
总结:
①先从内存中读取数据,再从磁盘中读取
②如果读取的数据使用了内存,又使用了磁盘,将从磁盘中读取的数据写入到内存
③数据的读取过程使用了多线程同步访问,保证数据读取的安全
(2)远程读取数据
BlockManager.scala
[plain]
- private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
- require(blockId != null, "BlockId is null")
- // 从BlockManagerMaster上,获取blockId对应的BlockManager信息
- // 然后随机打乱
- val locations = Random.shuffle(master.getLocations(blockId))
- var numFetchFailures = 0
- // 遍历BlockManager
- for (loc <- locations) {
- logDebug(s"Getting remote block $blockId from $loc")
- // 使用blockTransferService,进行异步的远程网络获取block数据
- val data = try {
- blockTransferService.fetchBlockSync(
- loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()
- } catch {
- case NonFatal(e) =>
- numFetchFailures += 1
- if (numFetchFailures == locations.size) {
- // An exception is thrown while fetching this block from all locations
- throw new BlockFetchException(s"Failed to fetch block from" +
- s" ${locations.size} locations. Most recent failure cause:", e)
- } else {
- // This location failed, so we retry fetch from a different one by returning null here
- logWarning(s"Failed to fetch remote block $blockId " +
- s"from $loc (failed attempt $numFetchFailures)", e)
- null
- }
- }
- if (data != null) {
- if (asBlockResult) {
- return Some(new BlockResult(
- // 反序列化
- dataDeserialize(blockId, data),
- DataReadMethod.Network,
- data.limit()))
- } else {
- return Some(data)
- }
- }
- logDebug(s"The value of block $blockId is null")
- }
- logDebug(s"Block $blockId not found")
- None
- }
(3) 写数据
BlockManager.scala
[plain]
- private def doPut(
- blockId: BlockId,
- data: BlockValues,
- level: StorageLevel,
- tellMaster: Boolean = true,
- effectiveStorageLevel: Option[StorageLevel] = None)
- : Seq[(BlockId, BlockStatus)] = {
- require(blockId != null, "BlockId is null")
- require(level != null && level.isValid, "StorageLevel is null or invalid")
- effectiveStorageLevel.foreach { level =>
- require(level != null && level.isValid, "Effective StorageLevel is null or invalid")
- }
- // Return value
- val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
- /* Remember the block's storage level so that we can correctly drop it to disk if it needs
- * to be dropped right after it got put into memory. Note, however, that other threads will
- * not be able to get() this block until we call markReady on its BlockInfo. */
- // 为要写入的block,创建一个BlockInfo,并放入BlockInfo map中
- val putBlockInfo = {
- val tinfo = new BlockInfo(level, tellMaster)
- // Do atomically !
- val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
- if (oldBlockOpt.isDefined) {
- if (oldBlockOpt.get.waitForReady()) {
- logWarning(s"Block $blockId already exists on this machine; not re-adding it")
- return updatedBlocks
- }
- // TODO: So the block info exists - but previous attempt to load it (?) failed.
- // What do we do now ? Retry on it ?
- oldBlockOpt.get
- } else {
- tinfo
- }
- }
- val startTimeMs = System.currentTimeMillis
- /* If we're storing values and we need to replicate the data, we'll want access to the values,
- * but because our put will read the whole iterator, there will be no values left. For the
- * case where the put serializes data, we'll remember the bytes, above; but for the case where
- * it doesn't, such as deserialized storage, let's rely on the put returning an Iterator. */
- var valuesAfterPut: Iterator[Any] = null
- // Ditto for the bytes after the put
- var bytesAfterPut: ByteBuffer = null
- // Size of the block in bytes
- var size = 0L
- // The level we actually use to put the block
- val putLevel = effectiveStorageLevel.getOrElse(level)
- // If we're storing bytes, then initiate the replication before storing them locally.
- // This is faster as data is already serialized and ready to send.
- val replicationFuture = data match {
- case b: ByteBufferValues if putLevel.replication > 1 =>
- // Duplicate doesn't copy the bytes, but just creates a wrapper
- val bufferView = b.buffer.duplicate()
- Future {
- // This is a blocking action and should run in futureExecutionContext which is a cached
- // thread pool
- replicate(blockId, bufferView, putLevel)
- }(futureExecutionContext)
- case _ => null
- }
- // 对BlockInfo加锁,进行多线程并发访问同步
- putBlockInfo.synchronized {
- logTrace("Put for block %s took %s to get into synchronized block"
- .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
- var marked = false
- try {
- // returnValues - Whether to return the values put
- // blockStore - The type of storage to put these values into
- // 根据持久化级别,选择一种BlockStore, MemoryStore, DiskStore等
- val (returnValues, blockStore: BlockStore) = {
- if (putLevel.useMemory) {
- // Put it in memory first, even if it also has useDisk set to true;
- // We will drop it to disk later if the memory store can't hold it.
- (true, memoryStore)
- } else if (putLevel.useOffHeap) {
- // Use external block store
- (false, externalBlockStore)
- } else if (putLevel.useDisk) {
- // Don't get back the bytes from put unless we replicate them
- (putLevel.replication > 1, diskStore)
- } else {
- assert(putLevel == StorageLevel.NONE)
- throw new BlockException(
- blockId, s"Attempted to put block $blockId without specifying storage level!")
- }
- }
- // Actually put the values
- // 根据store级别,数据的类型,把数据放入store中
- val result = data match {
- case IteratorValues(iterator) =>
- blockStore.putIterator(blockId, iterator, putLevel, returnValues)
- case ArrayValues(array) =>
- blockStore.putArray(blockId, array, putLevel, returnValues)
- case ByteBufferValues(bytes) =>
- bytes.rewind()
- blockStore.putBytes(blockId, bytes, putLevel)
- }
- size = result.size
- result.data match {
- case Left (newIterator) if putLevel.useMemory => valuesAfterPut = newIterator
- case Right (newBytes) => bytesAfterPut = newBytes
- case _ =>
- }
- // Keep track of which blocks are dropped from memory
- if (putLevel.useMemory) {
- result.droppedBlocks.foreach { updatedBlocks += _ }
- }
- // 获取到一个Block对应的BlockStatus
- val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
- if (putBlockStatus.storageLevel != StorageLevel.NONE) {
- // Now that the block is in either the memory, externalBlockStore, or disk store,
- // let other threads read it, and tell the master about it.
- marked = true
- putBlockInfo.markReady(size)
- if (tellMaster) {
- // 将新写入的block数据,发送给BlockManagerMasterEndpoint
- // 进行block元数据的同步和维护
- reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
- }
- updatedBlocks += ((blockId, putBlockStatus))
- }
- } finally {
- // If we failed in putting the block to memory/disk, notify other possible readers
- // that it has failed, and then remove it from the block info map.
- if (!marked) {
- // Note that the remove must happen before markFailure otherwise another thread
- // could've inserted a new BlockInfo before we remove it.
- blockInfo.remove(blockId)
- putBlockInfo.markFailure()
- logWarning(s"Putting block $blockId failed")
- }
- }
- }
- logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
- // Either we're storing bytes and we asynchronously started replication, or we're storing
- // values and need to serialize and replicate them now:
- // 持久化级别定义了 _2 级别,需要将block数据,备份到其他节点
- if (putLevel.replication > 1) {
- data match {
- case ByteBufferValues(bytes) =>
- if (replicationFuture != null) {
- Await.ready(replicationFuture, Duration.Inf)
- }
- case _ =>
- val remoteStartTime = System.currentTimeMillis
- // Serialize the block if not already done
- if (bytesAfterPut == null) {
- if (valuesAfterPut == null) {
- throw new SparkException(
- "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
- }
- bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
- }
- // 进行数据备份
- replicate(blockId, bytesAfterPut, putLevel)
- logDebug("Put block %s remotely took %s"
- .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
- }
- }
- BlockManager.dispose(bytesAfterPut)
- if (putLevel.replication > 1) {
- logDebug("Putting block %s with replication took %s"
- .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
- } else {
- logDebug("Putting block %s without replication took %s"
- .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
- }
- updatedBlocks
- }
MemoryStore.scala
[plain]
- // 优先写入内存,如果内存不足,从内存中移除部分旧数据,再将block存入内存
- private def tryToPut(
- blockId: BlockId,
- value: () => Any,
- size: Long,
- deserialized: Boolean,
- droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
- /* TODO: Its possible to optimize the locking by locking entries only when selecting blocks
- * to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has
- * been released, it must be ensured that those to-be-dropped blocks are not double counted
- * for freeing up more space for another block that needs to be put. Only then the actually
- * dropping of blocks (and writing to disk if necessary) can proceed in parallel. */
- memoryManager.synchronized {
- // Note: if we have previously unrolled this block successfully, then pending unroll
- // memory should be non-zero. This is the amount that we already reserved during the
- // unrolling process. In this case, we can just reuse this space to cache our block.
- // The synchronization on `memoryManager` here guarantees that the release and acquire
- // happen atomically. This relies on the assumption that all memory acquisitions are
- // synchronized on the same lock.
- releasePendingUnrollMemoryForThisTask()
- //判断内存是否存够放入数据
- val enoughMemory = memoryManager.acquireStorageMemory(blockId, size, droppedBlocks)
- if (enoughMemory) {
- // 内存足够
- // We acquired enough memory for the block, so go ahead and put it
- val entry = new MemoryEntry(value(), size, deserialized)
- // 同步entries
- entries.synchronized {
- entries.put(blockId, entry)
- }
- val valuesOrBytes = if (deserialized) "values" else "bytes"
- logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
- blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed)))
- } else {
- // Tell the block manager that we couldn't put it in memory so that it can drop it to
- // disk if the block allows disk storage.
- lazy val data = if (deserialized) {
- Left(value().asInstanceOf[Array[Any]])
- } else {
- Right(value().asInstanceOf[ByteBuffer].duplicate())
- }
- // 如果block允许磁盘存储,就从BlockManager溢出一部分数据,如果不允许持久化到磁盘,数据就丢失了
- val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data)
- droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
- }
- enoughMemory
- }
- }
DiskStore.scala
[plain]
- override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = {
- // So that we do not modify the input offsets !
- // duplicate does not copy buffer, so inexpensive
- val bytes = _bytes.duplicate()
- logDebug(s"Attempting to put block $blockId")
- val startTime = System.currentTimeMillis
- val file = diskManager.getFile(blockId)
- val channel = new FileOutputStream(file).getChannel
- Utils.tryWithSafeFinally {
- while (bytes.remaining > 0) {
- channel.write(bytes)
- }
- } {
- channel.close()
- }
- val finishTime = System.currentTimeMillis
- logDebug("Block %s stored as %s file on disk in %d ms".format(
- file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime))
- PutResult(bytes.limit(), Right(bytes.duplicate()))
- }
数据在其他节点的BlockManager上备份
BlockManager.scala
[plain]
- private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = {
- val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
- val numPeersToReplicateTo = level.replication - 1
- val peersForReplication = new ArrayBuffer[BlockManagerId]
- val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
- val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
- val tLevel = StorageLevel(
- level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
- val startTime = System.currentTimeMillis
- val random = new Random(blockId.hashCode)
- var replicationFailed = false
- var failures = 0
- var done = false
- // Get cached list of peers
- peersForReplication ++= getPeers(forceFetch = false)
- // Get a random peer. Note that this selection of a peer is deterministic on the block id.
- // So assuming the list of peers does not change and no replication failures,
- // if there are multiple attempts in the same node to replicate the same block,
- // the same set of peers will be selected.
- // 随机获取一个BlockManager
- def getRandomPeer(): Option[BlockManagerId] = {
- // If replication had failed, then force update the cached list of peers and remove the peers
- // that have been already used
- if (replicationFailed) {
- peersForReplication.clear()
- peersForReplication ++= getPeers(forceFetch = true)
- peersForReplication --= peersReplicatedTo
- peersForReplication --= peersFailedToReplicateTo
- }
- if (!peersForReplication.isEmpty) {
- Some(peersForReplication(random.nextInt(peersForReplication.size)))
- } else {
- None
- }
- }
- // One by one choose a random peer and try uploading the block to it
- // If replication fails (e.g., target peer is down), force the list of cached peers
- // to be re-fetched from driver and then pick another random peer for replication. Also
- // temporarily black list the peer for which replication failed.
- //
- // This selection of a peer and replication is continued in a loop until one of the
- // following 3 conditions is fulfilled:
- // (i) specified number of peers have been replicated to
- // (ii) too many failures in replicating to peers
- // (iii) no peer left to replicate to
- //
- while (!done) {
- getRandomPeer() match {
- case Some(peer) =>
- try {
- val onePeerStartTime = System.currentTimeMillis
- data.rewind()
- logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer")
- // 使用blockTransferService异步将数据写入其他的BlockManager上
- blockTransferService.uploadBlockSync(
- peer.host, peer.port, peer.executorId, blockId, new NioManagedBuffer(data), tLevel)
- logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %s ms"
- .format(System.currentTimeMillis - onePeerStartTime))
- peersReplicatedTo += peer
- peersForReplication -= peer
- replicationFailed = false
- if (peersReplicatedTo.size == numPeersToReplicateTo) {
- done = true // specified number of peers have been replicated to
- }
- } catch {
- case e: Exception =>
- logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)
- failures += 1
- replicationFailed = true
- peersFailedToReplicateTo += peer
- if (failures > maxReplicationFailures) { // too many failures in replcating to peers
- done = true
- }
- }
- case None => // no peer left to replicate to
- done = true
- }
- }
- val timeTakeMs = (System.currentTimeMillis - startTime)
- logDebug(s"Replicating $blockId of ${data.limit()} bytes to " +
- s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms")
- if (peersReplicatedTo.size < numPeersToReplicateTo) {
- logWarning(s"Block $blockId replicated to only " +
- s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers")
- }
- }
总结:
①写内存不足的处理机制
②写完以后汇报BlockManagerMaster
③如果要备份随机挑选一个BlocKManager,使用blockTransformInterface将数据传输过去
登录 | 立即注册