博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
第9课筆記
阅读量:6372 次
发布时间:2019-06-23

本文共 3558 字,大约阅读时间需要 11 分钟。

hot3.png

輸入 輸出是 整個Spark Streaming 的最重要的一個環節,因為我們需要Input Data,我們就是用 socketTextStream 的角度出發,來研究DStream.

為什麼要有 Receiver,因為 Receiver 負責不斷地接收外部的數據,並且報給我們的 Driver 中,這樣我們每個 BatchDuration 會跟據我們報的數據,然後就生成不同的Job 去執行我們 RDD 的操作。

 

 

SocketTextStream 或者是 Kafka 或者是 Flume 的數據來源構建了的 DStream,它們都是 InputDStream, 所謂的 InputDStream 就是輸入數據來源。輸入數據來源是整個應用程序至關重要的一個環節,因為如果你沒有輸入數據來源,那你就無法產生數據,然後在每個 BatchDuration 去生成 DStream 的時候就無法獲取數據,也就沒有後面所說的流處理等相關的其他的故事。

 

 

我們就是用 socketTextStream 的角度出發,來研究DStream,關鍵點是研究 Receiver 的起動過程,Receiver 的啟動是屬於應用程式啟動的一部份,因為你如果不是啟動的時候啟動 Receiver , 它沒法實例化就沒法接收數據。它是隨著應用程序的啟動而啟動。

 

 

我們怎麼 去啟動 Receiver?

 

你可以認為 InputDstream 跟 Receiver 是1對1的關係。每個 Receiver 就好像不同的 RDD 的 partition。每一個Receiver 啟都有一個作業去負責。Receiver 的啟動從SparkCore的角度來看,它不會知道你在啟動的是一個 Receiver, SparkCore 看見所有的啟動都是應用程序的行為,大家自己作實驗的時候一般是一個 Receiver ,但你可以完全有多個 Receiver。

 

你可以認為 Receiver 和 InputDstraem 是一對一的關係,我啟動一個Job, Job里面有 RDD transformation, Job 里面只有一個分片,這個特出的地方就是這個 partition 只有一個成員,就是我們啟動的一個 Receiver,

 

再次看一下DStreamGraph, 里面第一個成員是 InputDStream,只要創建多個InputDstream,不同的數據來源比如說Flume/ Kafka 就會有不同的InputDStream。OutputStream 可以有多個不同的

 

想一想這個有沒有問題,如果我有多個 DStream, 然後創建了多個 Receiver ,每個Receiver就等如一個partition, 我們啟動 Receiver 的時候就是在不同的機器上啟動 Receiver,由於SparkCore 感覺不到你的獨特性,它會基於普通的 Job 的調度來講,普通的 Job 的調度的角度是不是首先由一種情況,我們可能會在同一個 Executor 中啟動多個 Receiver 

 

第一個結果有可能會導致負载不均衡,因為你只是一個 RDD 的不同 partition 。啟動這個不同的 DStream, 就是 Receiver , 就是 DStream 代表的,啟動應用程序不同的 Receiver, 我們是採用 RDD 的不同 partition 代表不同的 Receiver, 然後啟動的時候不同的 partition 就是執行層面就是不同的 Task. 每個 Task 在啟動的時候,就真正的啟動一個 Receiver 。

啟動可能不成功。

 

第二個結果有可能啟動 Receiver 失敗,只有集群存在,啟動Receiver 就不應該啟動失敗。因為它是基於 RDD 的分片就代表不同的 Receiver,Receiver 是基於作為一個分片,其實分片可能是業務邏輯的東西。然後它跟據任務的執行這個 Receiver 就啟動起來了。到Executor上啟動Receiver 的時候,是不是有可能出現故障而導致失敗。它可能還有 stage retry 的次數,但是 task 失敗會對導致整個 Job 的失敗。你運行過程中也有可能基於每個 task 去啟動 Executor 這種方式,這個 task 也有可能失敗。我們從運行的過程中它也有可能運行失敗。因為 Receiver 是 7x24 的不斷在運行,它就有可能出故障。 Receiver 構造的只是有內存中弄一個數據結構然後發出一個 Job。Receiver 是不可能失敗。

 

 

在這里就只返回一個對象而已 new SocketInputDStream

 

 

 

首先是從 ssc.start( ) 的調度開始,那里有receiverTracker.start( ) 方法

這里是 Scheduler層次, 這啟動一個線程,它只所以啟動多一條線程是因為一方面是線程本地的一些屬性,另外一方面是不要阻塞主線程。因為主線程需要負責跟 Executor, or Driver 返回信息。Receiver 的啟動是在 jobScheduler.start( ) 方法啟動的。

 

首先 getInputDStream, 它是在 ReceiverTracker 中啟動,RecevierTracker 監控整個集群的所有 Receiver, 向 receiverTracker 報消息。

 

Receiver 的啟動是依據於你的輸入輸出流的,判斷 receiverInputStream 是否為空?如果沒有 InputDStream 的話,它就不會啟動 Receiver 

 

它必需不為空,你才能啟動Receiver, Receiver 的啟動是基於輸入輸出流,如果沒有 InputStream 的話,它就不會啟動 Receiver。

 

 

ReceiverInputDStream 是在 Driver 端,一個receiverInputStream 產生一個 Receiver 

這里是50個線程,20個並發度

 

確認一下 receiver 要運行在那個 executor 里

 

 

在這里,它這邊起啟Receiver 不會重試連接,因為 else 是空的。

 

 

它為了啟動 Receiver, 啟動了一個 Spark 的作業。這個作業是啟動一個 Receiver 還是啟動所有的 Receiver 呢?

答案:你可以看看receiverRDD 里,只有一個,每個 receiver 的啟動都會啟動一個作業。我們可以一步一步去追蹤。 從源碼角度可以看它只有一個 receiverId

receiverRDD.setName(s”Receiver $receiverId”)

 

然後再看看那個receiverid 的參數是從那里來的,是在調度 receive 方法時候有一個 for loop 來獲取的參數,然後每一個又會調用 startReceiver 方法,得出的結果就是每次Spark Streaming 要啟動 Receiver 的時候,都會有一個作業來負責,其內部是啟動一個作業來執行的。

 

 

 

 

 

這應該是最多程度的負均衡,因為它每次都會,但是這東西可能失敗,

 

這里會判斷成功跟失敗, 如果這個作業失敗的話

 

 

 

默認情況下是 True

 

 

它一旦失敗啦就會發消息過去,發消息過去它會不會認為是作業失敗呀,你假設說失敗,再次重新連接。它會確保有集群上一定會起動成功的。就是task 重試的次數不受限制。

它每個 Job 就只有一個任務,所以只會在一台機器上運行。

 

 

第一,你的Receiver 啟動失敗可以導致整個應用程式失敗

第二,你在運行的過程中,你有可能有幾個 Receiver,

第三,如果你失敗啦,那十幾個 Receiver 都會在 Executor 上運行

第四,可以重試,但重試會有代價。

 

如果Receiver 啟動失敗 會有什麼結果?

 

這里 shouldStartReciver 是看看 receiver 啟動了沒有。這里默認情況是 True, 如果作業失敗的話,它會自動重新起啟。不斷的 restart 直到成功。

 

 

shouldStartReciever 是一個boolean 值,查查 isTrackerStopping or isTrackerStopped 是不是 (不是 

 

 

 

它在調用 start( )方法時把參數變為 Started. 

 

每個機器上會有一係列可選機器的列表,它會當第一次receiver 失敗,它會重新啟動減掉 之前的一個,

 

 

 

 

 

 

 

 

 

 

 

 

 

转载于:https://my.oschina.net/jcchoiling/blog/679303

你可能感兴趣的文章
Edge 浏览器奇葩 bug:“123456”打印成“114447”
查看>>
Sirius —— 开源版的 Siri ,由 Google 支持
查看>>
《OpenGL ES应用开发实践指南:Android卷》—— 2.7 小结
查看>>
《Windows Server 2012活动目录管理实践》——第 2 章 部署第一台域控制器2.1 案例任务...
查看>>
Java Date Time 教程-时间测量
查看>>
Selector.wakeup实现注记
查看>>
《Java EE 7精粹》—— 第1章 Java EE 1.1 简介
查看>>
《Exchange Server 2013 SP1管理实践》——导读
查看>>
syslog:类Unix系统常用的log服务
查看>>
使用Annotation设计持久层
查看>>
深入实践Spring Boot2.4.1 Neo4j依赖配置
查看>>
Zen Cart 如何添加地址栏上的小图标
查看>>
SecureCrt 连接Redhat linux
查看>>
[NHibernate]持久化类(Persistent Classes)
查看>>
如何在Hive中使用Json格式数据
查看>>
linux如何恢复被删除的热文件
查看>>
Eclipse(MyEclipse) 自动补全
查看>>
Struts2中dispatcher与redirect的区别
查看>>
zabbix agentd configure
查看>>
地图点聚合优化方案
查看>>