過激なオラクルの目から見たBigdataスタック

Habréやその他のインターネットでは、ほぼ毎日ビッグデートに関する空の記事を投稿しており、ビッグデートのスタックの背後にはマーケティング以外の何物もないという専門家の強い気持ちを生み出しています。 実際、Hadoopフードの下には非常に多くの興味深いテクノロジーがあります。ここでは、Oracleの経験を持つ技術スペシャリストの見た目でマーケティングを少し希釈したいと思います。



まず第一に、Hadoop bigdatesの柱の1つは、バッチ処理とmap-reduceだけではなく、多くの人が描写しようとしていることを理解することは価値があります。 反対のタスクスペクトルから簡単に処理できます。たとえば、IoT(Hadoopのスパーク、Kafkaストリームの読み取り)からの小さなメッセージのストリームの読み取り、その場での偏差の集約と検出です。 これらは、JOINを備えたレポートシステムからImpalaを介して寄せ木細工のファイルへの小さなクエリのクラウドです。 これらはすべて同じHadoopエコシステムです。 成熟度が異なり、異なるアプローチを必要とするものがたくさんあります。 そして、私を信じてください、誰もが最も勝利するオプションを本当に知りません。 どこかでタスクはhdfsの古典的なmap-reduceと十分な原始的な寄せ木細工に完全に適合し、どこかでSparkはほとんどテキストファイルに通常のSQL JOINを使用します。 Clouderaの最大のディストリビューションは、Kuduデータベースに似たものを積極的に宣伝しています。これは、Javaのmap-reduceスタイルやImpalaのSQLで使用できます。



多くのものがあり、これらのほとんどすべてが互いに組み合わされています。組み合わせによって、処理方法は非常に大きく異なる場合があります。 原理的には、90年代後半、ゼロの始まりであり、あらゆる種類のxmlテーブル、Javaストアドプロシージャ、その他の奇妙なものをsubdに押し込み始めたOracleにリモートで似ています。



Hadoop / map-reduceを使用してプロジェクトに初めて参加したとき、それは完全に理解不能でした。そのため、このアプローチでは本格的なDBMSと競合できます。 最初に、マッパーはすべてを読み取り、リデューサーの出力をhdfsに書き込み、次にリデューサーの読み取り、書き込みを繰り返します。 オラクルの美しさの概念では、これは間違いなく失敗しないように思われました。 しかし、後で何のために理解がありました。



Oracleがディスク上に「余分な」書き込みを行うことで並列実行を増加させようとする、一見多くの不要な作業の様子について。 たとえば、Oracleは「余分な」UNDOで記述し、競合する読者を育てることができます。 ロックをデータブロックに書き込みます。これにより、ロックの膨大なリストをメモリに保持せずに、RACクラスタノード間でロックに関する情報を透過的に交換できます。 ほぼ同じ方法で、map-reduceの余分に見える落書きを見てみる価値があります。 最終的にこのすべての「余分な」ことにより、Oracleの背景に対して、より多くのタスクを並行して実行できます。



同時に、寄木細工のファイルのサイズにすぐに驚きました。80〜100 GBのインデックスのないOracleのプレートは、簡単に20〜30 GBの寄木細工になります。 その結果、map-reduceはディスクから数倍少ない読み取りを行い、すぐに多数のクラスターコアを読み込みますが、Oracleはより多くを読み取り、すべての計算を単一のユーザープロセスに配置する必要があります。 これはPL / SQLマシンの庭にある石ですが、SQLエンジンには並列読み取りのトピックに関する多くのニュアンスもあります。



Hadoopでのロジックの実装がどのように見えるかを明確にするために、sql.ru Oracleブランチの「金曜日の問題」の1つを説明できます。 原則として、すべてはmap-reduceとSparkのトピックに関する戦いから始まりました。



タスクはこれです:



調整された値ごとに検出するクエリを作成します

1)前のオリジナル(ID順)

2)変数によって設定されたシフトを持つオリジナル(前のシフトはシフト0と見なされます)

3)前のオリジナルの数。その量が指定された制限を超えないようにします



varシフト番号

var limit number

exec:shift:= 2;



PL / SQLプロシージャが正常に完了しました。



exec:limit:= 2000;



PL / SQLプロシージャが正常に完了しました。


Oracleでは、次のようにソリューションが提案されました。



SQL> select t.id, 2 t.type, 3 t.value, 4 decode(type, 'adjusted', max(decode(type, 'original', value)) over(partition by grp)) prev_o_value, 5 decode(type, 'adjusted', max(decode(type, 'original', shift_n)) over(partition by grp)) prev_shift_n_o_value, 6 decode(type, 'adjusted', count(decode(type, 'original', id)) 7 over(order by orig_val_running_sum range between 2000 preceding and 1 preceding)) count_o 8 from (select t.*, 9 sum(decode(type, 'original', value)) over(order by id) - decode(type, 'original', value, 0) orig_val_running_sum, 10 decode(type, 'original', lag(value, 2) over(order by decode(type, 'original', 0, 1), id)) shift_n, 11 sum(decode(type, 'original', 1)) over(order by id) grp 12 from t) t 13 order by id; ID TYPE VALUE PREV_O_VALUE PREV_SHIFT_N_O_VALUE COUNT_O ---------- ------------------------------ ---------- ------------ -------------------- ---------- 10 original 100 20 original 200 30 adjusted 300 200 2 40 original 400 50 adjusted 500 400 100 3 60 original 600 70 original 700 80 adjusted 800 700 400 5 90 adjusted 900 700 400 5 100 original 1000 110 adjusted 1100 1000 600 2 120 original 1200 130 adjusted 1300 1200 700 1 140 original 1400 150 adjusted 1500 1400 1000 1 15 rows selected.
      
      





SparkSQLでは、3番目の段落なしで問題が解決されます。



 select t.id, t.type, t.value, case when type = 'adjusted' then max(case when type = 'original' then value end) over (partition by position, grp) end prev_o_value, case when type = 'adjusted' then max(case when type = 'original' then shift_n end) over (partition by position, grp) end prev_shift_n_o_value from (select t.*, case when type = 'original' then lag(value, 2) over (partition by position order by case when type = 'original' then 0 else 1 end, id) end shift_n, sum(case when type = 'original' then 1 end) over (partition by position order by id) grp from t) t order by id +---+--------+-----+------------+--------------------+ |id |type |value|prev_o_value|prev_shift_n_o_value| +---+--------+-----+------------+--------------------+ |10 |original|100 |null |null | |20 |original|200 |null |null | |30 |adjusted|300 |200 |null | |40 |original|400 |null |null | |50 |adjusted|500 |400 |100 | |60 |original|600 |null |null | |70 |original|700 |null |null | |80 |adjusted|800 |700 |400 | |90 |adjusted|900 |700 |400 | |100|original|1000 |null |null | |110|adjusted|1100 |1000 |600 | |120|original|1200 |null |null | |130|adjusted|1300 |1200 |700 | |140|original|1400 |null |null | |150|adjusted|1500 |1400 |1000 | +---+--------+-----+------------+--------------------+
      
      





ご覧のとおり、spark-sqlのソリューションはOracleとほとんど同じで、違いはありません。 そして、これがmap-reduceソリューションです:



マッパー



 public class ParquetMapper extends Mapper<LongWritable, GenericRecord, Text, AvroValue<GenericRecord>> { private final Text outputKey = new Text(); private final AvroValue<GenericRecord> outputValue = new AvroValue<GenericRecord>(); @Override protected void map(LongWritable key, GenericRecord value, Context context) throws IOException, InterruptedException { outputKey.set(String.valueOf(value.get("position"))); outputValue.datum(value); context.write(outputKey, outputValue); } }
      
      





減速機



 public class ParquetReducer extends Reducer<Text, AvroValue<GenericRecord>, Void, Text> { private static final byte shift = 2 ; private TreeMap<Integer, AbstractMap.SimpleEntry<String, Integer>> rows = new TreeMap<Integer,AbstractMap.SimpleEntry<String, Integer>>(); List<Integer> queue = new LinkedList<Integer>(); private String adj = ""; private int lastValue = -1; @Override protected void reduce(Text key, Iterable<AvroValue<GenericRecord>> values, Context context) throws IOException, InterruptedException { for (AvroValue<GenericRecord> value : values) { rows.put((Integer) value.datum().get("id"), new AbstractMap.SimpleEntry(value.datum().get("type"), value.datum().get("value"))) ; } for(Map.Entry<Integer, AbstractMap.SimpleEntry<String, Integer>> entry : rows.entrySet()) { AbstractMap.SimpleEntry<String, Integer> rowData = entry.getValue(); if (rowData.getKey().equals("original")) { lastValue = rowData.getValue() ; queue.add(lastValue) ; adj = "" ; } else { adj = " " + String.valueOf(lastValue); if (queue.size()- shift >0) { adj = adj + " " + queue.get(queue.size()-shift).toString() ; } } Text output = new Text(entry.getKey()+" "+rowData.getKey() + " " + rowData.getValue() + adj); context.write(null, output ); } } }
      
      





打ち上げ



[yarn@sandbox map-reduce]$ hadoop fs -cat /out/part-r-00000

10 original 100

20 original 200

30 adjusted 300 200

40 original 400

50 adjusted 500 400 200

60 original 600

70 original 700

80 adjusted 800 700 600

90 adjusted 900 700 600

100 original 1000

110 adjusted 1100 1000 700

120 original 1200

130 adjusted 1300 1200 1000

140 original 1400

150 adjusted 1500 1400 1200








結論:bigdateスタックは巨大であり、独自のパラダイムで動作する多くのサブシステムがあります。map-reduceはボルトの1つに過ぎず、Sparkの全盛期に照らして、誰もが今必要としているという事実ではありません。 bigdatesに実装されているパラダイムはバッチ処理だけではありません。Sparkでは、宣言型SQLを含むロジックを作成できます。 Map-reduceは一部のタスクにも非常にうまく配置されますが、同時に深刻なOracleサーバーのみが最近実行できたタスクを簡単に解決します。 コードをよく見ると、Spark-sqlがタスクの項目3をまだ実装していないことがわかりますが、map-reduceコードは宣言的ではありませんが、非常にエレガントで簡単に拡張できることがわかりました。 タスクの項目3は、平均的なousさの開発者によって数分で簡単にmap-reduceコードに追加されますが、Oracleソリューションに分析関数を積み上げるには、開発者の真剣な準備が必要です。



All Articles