鸿 网 互 联 www.68idc.cn

解密SparkStreaming运行机制和架构分析

来源:互联网 作者:佚名 时间:2016-05-15 06:53
解密 Spark Streaming Job 架构和运行机制 解密 Spark Streaming 容错架构和运行机制 作业的生成肯定是一个动态的生成 private [streaming] val graph : DStreamGraph = { if ( isCheckpointPresent ) { cp_. graph .setContext( this ) cp_. graph . restor

  1. 解密Spark Streaming Job架构和运行机制

  2. 解密Spark Streaming容错架构和运行机制

  1. 作业的生成肯定是一个动态的生成

private[streaming]valgraph: DStreamGraph = {
  if(isCheckpointPresent) {
    cp_.graph.setContext(this)
    cp_.graph.restoreCheckpointData()
    cp_.graph
  
}else{
    require(batchDur_ != null,"Batch duration for StreamingContext cannot be null")
    valnewGraph =newDStreamGraph()
    newGraph.setBatchDuration(batchDur_)
    newGraph
  }
}

2.

private[streaming]valcheckpointDuration: Duration = {
  if(isCheckpointPresent) cp_.checkpointDurationelsegraph.batchDuration
}

private[streaming]valscheduler=newJobScheduler(this)

private[streaming]valwaiter=newContextWaiter

private[streaming]valprogressListener=newStreamingJobProgressListener(this)

private[streaming]valuiTab: Option[StreamingTab] =
  if(conf.getBoolean("spark.ui.enabled",true)) {
    Some(newStreamingTab(this))
  }else{
    None
  }

3.


defstart():Unit= synchronized {
  if(eventLoop!=null)return// scheduler has already been started

  logDebug("Starting JobScheduler")
  eventLoop=newEventLoop[JobSchedulerEvent]("JobScheduler") {
    override protected defonReceive(event: JobSchedulerEvent):Unit= processEvent(event)

    override protected defonError(e:Throwable):Unit= reportError("Error in job scheduler",e)
  }
  eventLoop.start()

  // attach rate controllers of input streams to receive batch completion updates
  for{
    inputDStream <- ssc.graph.getInputStreams
    rateController <- inputDStream.rateController
  
} ssc.addStreamingListener(rateController)

  listenerBus.start(ssc.sparkContext)
  receiverTracker=newReceiverTracker(ssc)
  inputInfoTracker=newInputInfoTracker(ssc)
  receiverTracker.start()
  jobGenerator.start()
  logInfo("Started JobScheduler")
}

 

1.StreamingContext调用start方法的内部其实是会启动JobSchedulerStart方法,进行消息循环,在JobScheduler start内部会构造JobGeneratorReceiverTacker,并且调用JobGeneratorReceiverTackerstart方法:
      (1). JobGenerator启动后会不断的根据batchDuration生成一个个的Job
      (2). ReceiverTracker启动后首先在Spark Cluster中启动Receiver(其实是在Executor中先启动ReceiverSupervisor),在Receiver收到 数据后会通过ReceiverSupervisor存储到Executor并且把数据的Metadata信息发送给Driver中的ReceiverTracker,在ReceiverTracker内部会通过ReceivedBlockTracker来管理接受到的元数据信息
   2. 每个BatchInterval会产生一个具体的Job,其实这里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDDDAG而已,从Java角度讲,相当于Runnable接口实例,此时要想运行Job需要提交给JobScheduler,在JobScheduler中通过线程池的方式找到一个

元数据:分布式的集群,为了高效的处理数据,一方面是数据本身,另一方面是数据存储在哪里,相当于数据的本身,高效的数据处理方法,是将数据的索引和数据进行区分,元数据的管理是通过BlockTracker来管理的,存数据通过BlockManager来存储管理

  1. Spark Streaming容错机制:

    从第二课中我们发现,Dstream相对于RDD是逻辑层面的,通过Dstream Graph随着时间的流失不断的产生RDD,所以对Dstream的操作就是在固定的时间上操作RDD。所以基于RDD的容错机制同样也实用与Spark Streamign的容错机制

 Spark Streaming的容错要考虑两个方面:

  1. 接收数据的安全性,1.默认情况下都会在两台机器上2.WAL机制,先把数据通过WAL做一个日志记录,kafka支持数据的回放

  2. 执行数据的安全性,Job执行完全靠RDD的引导,Excutor的安全性

  3. Driver级别的安全性,DAG生成的模板,Dstream GraphReciever Tracker, JobGenerator,只需要做一个checkpoint就行

感谢DT大数据梦工厂支持提供以下内容,DT大数据梦工厂专注于Spark发行版定制。详细信息请查看
 联系邮箱18610086859@126.com
 电话:18610086859
 QQ:1740415547
 微信号:18610086859

 


网友评论
<