クラスターで通常のタスクを実行するか、Apache SparkとOozieを友達にする







Oozieを介して通常のSparkタスクの起動を実装する必要性は長い間空いていましたが、すべての手が届きませんでした。 この記事では、全体のプロセスを説明したいと思います。おそらくそれはあなたの人生を単純化するでしょう。







内容





挑戦する



hdfsには次の構造があります。







hdfs://hadoop/project-MD2/data hdfs://hadoop/project-MD2/jobs hdfs://hadoop/project-MD2/status
      
      





data



ディレクトリには毎日data



が提供され、日付に従ってディレクトリに分解されます。 たとえば、2017年12月31日のデータは、次の方法でhdfs://hadoop/project/data/2017/12/31/20171231.csv.gz



れますhdfs://hadoop/project/data/2017/12/31/20171231.csv.gz









入力形式




jobs



ディレクトリには、プロジェクトに直接関連するタスクが含まれています。 また、タスクをこのディレクトリに配置します。

統計は、json形式で各日の空のフィールドの数(nullの値)のstatus



ディレクトリに保存する必要があります。 たとえば、2017年12月31日のデータの場合、 hdfs://hadoop/project-MD2/status/2017/12/31/part-*.json



ファイルが表示されhdfs://hadoop/project-MD2/status/2017/12/31/part-*.json









JSONファイルを受け入れます:


 { "device_id_count_empty" : 0, "lag_A0_count_empty" : 10, "lag_A1_count_empty" : 0, "flow_1_count_empty" : 37, "flow_2_count_empty" : 100 }
      
      





ハードウェアとインストールされたソフトウェア



10マシンのクラスターを自由に使用できます。各クラスターには8コアプロセッサと64 GBのRAMがあります。 すべてのマシンのハードドライブの合計容量は100 TBです。 クラスターでタスクを開始するには、 PROJECTS



キューが割り当てられます。







インストールされたソフトウェア:




Sparkタスクの作成



プロジェクト構造を作成します。以下に示すように、scalaをサポートする開発環境またはコンソールから実行するのは非常に簡単です。







 mkdir -p daily-statistic/project echo "sbt.version = 1.0.2" > daily-statistic/project/build.properties echo "" > daily-statistic/project/plugins.sbt echo "" > daily-statistic/build.sbt mkdir -p daily-statistic/src/main/scala
      
      





daily-statistic/project/plugins.sbt



、アセンブリのプラグインを追加します。これには、 daily-statistic/project/plugins.sbt



ファイルに次の行を追加します。







 addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.5")
      
      





プロジェクト、依存関係、およびアセンブリ機能の説明をdaily-statistic/build.sbt









 name := "daily-statistic" version := "0.1" scalaVersion := "2.11.11" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "2.0.0" % "provided", "org.apache.spark" %% "spark-sql" % "2.0.0" % "provided" ) assemblyJarName in assembly := s"${name.value}-${version.value}.jar"
      
      





daily-statistic



ディレクトリに移動し、 sbt update



コマンドを実行してプロジェクトを更新し、リポジトリから依存関係をプルアップします。

src/main/scala/ru/daily



ディレクトリにStatistic.scala



を作成します







タスクコード:


 package ru.daily import org.apache.spark.sql.SparkSession import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions._ object Statistic extends App { //  implicit lazy val spark: SparkSession = SparkSession.builder() .appName("daily-statistic") .getOrCreate() import spark.implicits._ val workDir = args(0) val datePart = args(1) val saveDir = args(2) try { val date = read(s"$workDir/$datePart/*.csv.gz") .select( '_c0 as "device_id", '_c1 as "lag_A0", '_c2 as "lag_A1", '_c3 as "flow_1", '_c4 as "flow_2" ) save(s"$saveDir/$datePart", agg(date)) } finally spark.stop() //    def read(path: String)(implicit spark: SparkSession): DataFrame = { val inputFormat = Map("header" -> "false", "sep" -> ";", "compression" -> "gzip") spark.read .options(inputFormat) .csv(path) } //   def agg(data: DataFrame):DataFrame = data .withColumn("device_id_empty", when('device_id.isNull, lit(1)).otherwise(0)) .withColumn("lag_A0_empty", when('lag_A0.isNull, lit(1)).otherwise(0)) .withColumn("lag_A1_empty", when('lag_A1.isNull, lit(1)).otherwise(0)) .withColumn("flow_1_empty", when('flow_1.isNull, lit(1)).otherwise(0)) .withColumn("flow_2_empty", when('flow_2.isNull, lit(1)).otherwise(0)) .agg( sum('device_id_empty) as "device_id_count_empty", sum('lag_A0_empty) as "lag_A0_count_empty", sum('lag_A1_empty) as "lag_A1_count_empty", sum('flow_1_empty) as "flow_1_count_empty", sum('flow_2_empty) as "flow_2_count_empty" ) //   def save(path: String, data: DataFrame): Unit = data.write.json(path) }
      
      





daily-statistic



ディレクトリからsbt assembly



コマンドを使用しsbt assembly



プロジェクトをsbt assembly



ます。 アセンブリが正常に完了すると、 daily-statistic-0.1.jar



タスクを含むパッケージがdaily-statistic/target/scala-2.11



daily-statistic-0.1.jar



ます。







workflow.xmlを書く



Oozieを介してタスクを実行するには、 workflow.xml



ファイルで起動構成を記述する必要があります。 以下がタスクの例です。







 <workflow-app name="project-md2-daily-statistic" xmlns="uri:oozie:workflow:0.5"> <global> <configuration> <property> <name>oozie.launcher.mapred.job.queue.name</name> <value>${queue}</value> </property> </configuration> </global> <start to="project-md2-daily-statistic" /> <action name="project-md2-daily-statistic"> <spark xmlns="uri:oozie:spark-action:0.1"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <master>yarn-client</master> <name>project-md2-daily-statistic</name> <class>ru.daily.Statistic</class> <jar>${nameNode}${jobDir}/lib/daily-statistic-0.1.jar</jar> <spark-opts> --queue ${queue} --master yarn-client --num-executors 5 --conf spark.executor.cores=8 --conf spark.executor.memory=10g --conf spark.executor.extraJavaOptions=-XX:+UseG1GC --conf spark.yarn.jars=*.jar --conf spark.yarn.queue=${queue} </spark-opts> <arg>${nameNode}${dataDir}</arg> <arg>${datePartition}</arg> <arg>${nameNode}${saveDir}</arg> </spark> <ok to="end" /> <error to="fail" /> </action> <kill name="fail"> <message>Statistics job failed [${wf:errorMessage(wf:lastErrorNode())}]</message> </kill> <end name="end" /> </workflow-app>
      
      





global



ブロックでは、タスクを見つけて実行するMapReduceタスクのキューが設定されます。

action



ブロックは、アクション、この場合はスパークタスクの起動、および完了時に実行する必要があるものをステータス



またはERROR



記述します。

spark



ブロックでは、環境が定義され、タスクが構成され、引数が渡されます。 タスクの起動構成については、 spark-opts



説明していspark-opts



。 パラメータは公式ドキュメントに記載されています。

タスクがステータスERROR



で完了すると、実行はERROR



終了ブロックに渡され、複数のエラーメッセージが表示されます。

中括弧内のパラメーター、たとえば${queue}



、起動時に決定します。







coordinator.xmlの作成



定期的な起動を整理するには、別のcoordinator.xml



が必要です。 以下がタスクの例です。







 <coordinator-app name="project-md2-daily-statistic-coord" frequency="${frequency}" start="${startTime}" end="${endTime}" timezone="UTC" xmlns="uri:oozie:coordinator:0.1"> <action> <workflow> <app-path>${workflowPath}</app-path> <configuration> <property> <name>datePartition</name> <value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), -1, 'DAY'), "yyyy/MM/dd")}</value> </property> </configuration> </workflow> </action> </coordinator-app>
      
      





ここでは、実行頻度、タスクの開始日時、タスクの終了日時をそれぞれ決定するfrequency



start



end



パラメーターからおもしろい。

workflow



ブロックでは、 workflow.xml



ファイルがあるディレクトリへのパスが指定されています。これは、後で起動時に指定します。

configuration



ブロックで、 datePartition



プロパティーの値がdatePartition



されます。この場合、 yyyy/MM/dd



から1日を引いた形式の現在の日付に等しくなります。







hdfsでプロジェクトをホストする


既に述べたように、タスクをhdfs://hadoop/project-MD2/jobs



ディレクトリ:







 hdfs://hadoop/project-MD2/jobs/daily-statistic/lib/daily-statistic-0.1.jar hdfs://hadoop/project-MD2/jobs/daily-statistic/workflow.xml hdfs://hadoop/project-MD2/jobs/daily-statistic/coordinator.xml hdfs://hadoop/project-MD2/jobs/daily-statistic/sharelib
      
      





ここでは、原則として、 sharelib



ディレクトリを除き、コメントなしですべてが明確になります。 このディレクトリに、タスクの作成プロセスで使用されたすべてのライブラリを配置します。 私たちの場合、これらはすべてプロジェクトの依存関係で指定したSpark 2.0.0ライブラリです。 なぜこれが必要なのですか? 事実は、プロジェクトの依存関係で"provided"



を指定したことです。 これは、ビルドシステムがプロジェクトに依存関係を含める必要がなく、それらがスタートアップ環境によって提供されるが、世界が静止していないことを意味します。 タスクはこの更新の影響を受けやすいため、 sharelib



ディレクトリのライブラリセットを使用して実行します。 これがどのように構成されているかを以下に示します。







定期的に実行する



そして、すべてが発射のエキサイティングな瞬間に備えています。 コンソールからタスクを実行します。 起動時に、 xml



ファイルで使用したプロパティの値を設定する必要があります。 これらのプロパティを別のcoord.properties



ファイルにcoord.properties



ます。







 #   nameNode=hdfs://hadoop jobTracker=hadoop.host.ru:8032 #      coordinator.xml oozie.coord.application.path=/project-MD2/jobs/daily-statistic #    (  24 ) frequency=1440 startTime=2017-09-01T07:00Z endTime=2099-09-01T07:00Z #      workflow.xml workflowPath=/project-MD2/jobs/daily-statistic #  ,      mapreduce.job.user.name=username user.name=username #        dataDir=/project-MD2/data saveDir=/project-MD2/status jobDir=/project-MD2/jobs/daily-statistic #     queue=PROJECTS #       hdfs   oozie.libpath=/project-MD2/jobs/daily-statistic/sharelib oozie.use.system.libpath=false
      
      





すごい、すべてがこする準備ができています。 次のコマンドで通常の実行を開始します。







 oozie job -oozie http://hadoop.host.ru:11000/oozie -config coord.properties -run
      
      





開始後、タスクのjob_idがコンソールに表示されます。 このjob_idを使用すると、タスクのステータスに関する情報を確認できます。







 oozie job -info {job_id}
      
      





タスクを停止します。







 oozie job -kill {job_id}
      
      





job_idタスクがわからない場合は、ユーザーのすべての通常のタスクを表示して見つけることができます。







 oozie jobs -jobtype coordinator -filter user={user_name}
      
      





おわりに



それは少し遅れたことが判明しましたが、私の意見では、詳細な指示はインターネットでのクエスト検索よりも優れています。 説明された経験があなたの役に立つことを願っています、あなたの注意に感謝します!








All Articles