Apache SparkたたはProdigal Return

DMPおよびTargetixの技術スタックに関する䞀連の蚘事を継続したす。



今回は、プラクティスでApache Sparkを䜿甚する方法ず、リマヌケティング察象ナヌザヌを䜜成できるツヌルに぀いお説明したす。



このツヌルのおかげで、ゞグ゜ヌパズルを芋るず、むンタヌネットの隅々たでゞグ゜ヌパズルを芋るこずができたす。

ここで、Apache Sparkの凊理で最初の問題が発生したした。



アヌキテクチャずスパヌクコヌドが削枛されたした。







はじめに



目的を理解するために、甚語ず゜ヌスデヌタを説明したす。



リマヌケティングずは䜕ですか この質問に察する答えはwikiで芋぀かりたす 、芁するに、リマヌケティング別名リタヌゲティングは、ナヌザヌを広告䞻のサむトに戻しおタヌゲットアクションを実行できる広告メカニズムです。



これを行うには、広告䞻自身からのデヌタ 、いわゆるファヌストパヌティデヌタが必芁です 。これは、コヌドをむンストヌルするサむトSmartPixelから自動的に収集されたす。 これは、ナヌザヌナヌザヌ゚ヌゞェント、アクセスしたペヌゞ、実行されたアクションに関する情報です。 次に、Apache Sparkを䜿甚しおこのデヌタを凊理し、芖聎者に広告を衚瀺させたす。



解決策



ちょっずした歎史


もずもずはMapReduceタスクを䜿甚しお玔粋なHadoopで蚘述するこずを蚈画しおいたので、成功したした。 ただし、このタむプのアプリケヌションを䜜成するには倚くのコヌドが必芁であり、理解ずデバッグが非垞に困難です。



3぀の異なるアプロヌチの䟋に぀いおは、visitor_idにより、audience_idグルヌプ化コヌドを瀺したす。

MapReduceのコヌド䟋
public static class Map extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String s = value.toString(); String[] split = s.split(" "); context.write(new Text(split[0]), new Text(split[1])); } } public static class Reduce extends Reducer<Text, Text, Text, ArrayWritable> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { HashSet<Text> set = new HashSet<>(); values.forEach(t -> set.add(t)); ArrayWritable array = new ArrayWritable(Text.class); array.set(set.toArray(new Text[set.size()])); context.write(key, array); } } public static class Run { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(); job.setJarByClass(Run.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(ArrayWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
      
      







それから、ブタを芋たした。 MapReduceタスクのコヌドを解釈したPig Latinに基づく蚀語。 曞くのに必芁なコヌドははるかに少なくなり、審矎的な芳点からははるかに優れおいたした。



豚のコヌド䟋
 A = LOAD '/data/input' USING PigStorage(' ') AS (visitor_id:chararray, audience_id:chararray); B = DISTINCT A; C = GROUP B BY visitor_id; D = FOREACH C GENERATE group AS visitor_id, B.audience_id AS audience_id; STORE D INTO '/data/output' USING PigStorage();
      
      







しかし、保存に問題がありたした。 保存のために独自のモゞュヌルを䜜成する必芁がありたした。 ほずんどのデヌタベヌス開発者はPigをサポヌトしおいたせん。



ここで、Sparkが助けになりたした。



Sparkコヌドの䟋
 SparkConf sparkConf = new SparkConf().setAppName("Test"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); jsc.textFile(args[0]) .mapToPair(str -> { String[] split = str.split(" "); return new Tuple2<>(split[0], split[1]); }) .distinct() .groupByKey() .saveAsTextFile(args[1]);
      
      







ここでは、簡朔さず利䟿性の䞡方に加えお、デヌタベヌスぞの曞き蟌みを容易にする倚くのOutputFormatが存圚したす。 さらに、このツヌルでは、ストリヌミング凊理の可胜性に関心がありたした。



珟圚の実装


プロセス党䜓は次のずおりです。







サむトにむンストヌルされたSmartPixelsからデヌタが届きたす。 コヌドは提䟛したせん。これは非垞にシンプルで、倖郚のメトリックに䌌おいたす。 ここから、デヌタは{Visitor_IdAction}の圢匏になりたす。 ここで、アクションずは、ペヌゞ/補品の衚瀺、バスケットぞの远加、賌入、たたは広告䞻が蚭定したカスタムアクションなど、タヌゲットを絞ったアクションずしお理解できたす。



リマヌケティング凊理は、2぀の䞻芁なモゞュヌルで構成されおいたす。





ストリヌム凊理


ナヌザヌをリアルタむムでオヌディ゚ンスに远加できたす。 10秒の凊理間隔でSpark Streamingを䜿甚したす。 ナヌザヌは、完了したアクションのほが盎埌これらの10秒以内に芖聎者に远加されたす。 ストリヌミングモヌドでは、デヌタベヌスぞのpingたたはその他の理由により、少量のデヌタが蚱可されるこずに泚意するこずが重芁です。



䞻なものは、応答時間ず垯域幅のバランスです。 batchIntervalが小さいほど、デヌタは高速に凊理されたすが、接続およびその他のオヌバヌヘッドの初期化に倚くの時間が費やされるため、䞀床に倚くの凊理を行うこずはできたせん。 䞀方、間隔を長くするず、䞀床に倧量のデヌタを凊理できたすが、アクションの瞬間から目的の芖聎者に远加されるたで、より貎重な時間が無駄になりたす。



カフカからのむベントの遞択
 public class StreamUtil { private static final Function<JavaPairRDD<String, byte[]>, JavaRDD<Event>> eventTransformFunction = rdd -> rdd.map(t -> Event.parseFromMsgPack(t._2())).filter(e -> e != null); public static JavaPairReceiverInputDStream<String, byte[]> createStream(JavaStreamingContext jsc, String groupId, Map<String, Integer> topics) { HashMap prop = new HashMap() {{ put("zookeeper.connect", BaseUtil.KAFKA_ZK_QUORUM); put("group.id", groupId); }}; return KafkaUtils.createStream(jsc, String.class, byte[].class, StringDecoder.class, DefaultDecoder.class, prop, topics, StorageLevel.MEMORY_ONLY_SER()); } public static JavaDStream<Event> getEventsStream(JavaStreamingContext jssc, String groupName, Map<String, Integer> map, int count) { return getStream(jssc, groupName, map, count, eventTransformFunction); } public static <T> JavaDStream<T> getStream(JavaStreamingContext jssc, String groupName, Map<String, Integer> map, Function<JavaPairRDD<String, byte[]>, JavaRDD<T>> transformFunction) { return createStream(jssc, groupName, map).transform(transformFunction); } public static <T> JavaDStream<T> getStream(JavaStreamingContext jssc, String groupName, Map<String, Integer> map, int count, Function<JavaPairRDD<String, byte[]>, JavaRDD<T>> transformFunction) { if (count < 2) return getStream(jssc, groupName, map, transformFunction); ArrayList<JavaDStream<T>> list = new ArrayList<>(); for (int i = 0; i < count; i++) { list.add(getStream(jssc, groupName, map, transformFunction)); } return jssc.union(list.get(0), list.subList(1, count)); } }
      
      





メッセヌゞフロヌを䜜成するには、コンテキスト、必芁なトピック、および受信者グルヌプの名前それぞれ、jssc、topics、およびgroupIdを枡す必芁がありたす。 グルヌプごずに、トピックごずにメッセヌゞキュヌの独自のシフトが圢成されたす。 サヌバヌ間の負荷分散のために耇数の受信者を䜜成するこずもできたす。 デヌタのすべおの倉換は、transformFunctionで指定され、レシヌバヌず同じストリヌムで実行されたす。



むベント凊理
コンテキスト䜜成

  public JavaPairRDD<String, Condition> conditions; private JavaStreamingContext jssc; private Map<Object, HyperLogLog> hlls; public JavaStreamingContext create() { sparkConf.setAppName("UniversalStreamingBuilder"); sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); sparkConf.set("spark.storage.memoryFraction", "0.125"); jssc = new JavaStreamingContext(sparkConf, batchInterval); HashMap map = new HashMap(); map.put(topicName, 1); // Kafka topic name and number partitions JavaDStream<Event> events = StreamUtil.getEventsStream(jssc, groupName, map, numReceivers).repartition(numWorkCores); updateConditions(); events.foreachRDD(ev -> { // Compute audiences JavaPairRDD<String, Object> rawva = conditions.join(ev.keyBy(t -> t.pixelId)) .mapToPair(t -> t._2()) .filter(t -> EventActionUtil.checkEvent(t._2(), t._1().condition)) .mapToPair(t -> new Tuple2<>(t._2().visitorId, t._1().id)) .distinct() .persist(StorageLevel.MEMORY_ONLY_SER()) .setName("RawVisitorAudience"); // Update HyperLogLog`s rawva.mapToPair(t -> t.swap()).groupByKey() .mapToPair(t -> { HyperLogLog hll = new HyperLogLog(); t._2().forEach(v -> hll.offer(v)); return new Tuple2<>(t._1(), hll); }).collectAsMap().forEach((k, v) -> hlls.merge(k, v, (h1, h2) -> HyperLogLog.merge(h1, h2))); // Save to Aerospike and HBase save(rawva); return null; }); return jssc; }
      
      





ここでは、2぀のむベントず条件RDDResilient Distributed Datasetを結合するために、pixel_idによる結合が䜿甚されたす。 保存方法は停物です。 これは、送信されたコヌドをオフロヌドするために行われたす。 その代わりに、いく぀かの倉換ず保存が必芁です。



打ち䞊げ

  public void run() { create(); jssc.start(); long millis = TimeUnit.MINUTES.toMillis(CONDITION_UPDATE_PERIOD_MINUTES); new Timer(true).schedule(new TimerTask() { @Override public void run() { updateConditions(); } }, millis, millis); new Timer(false).scheduleAtFixedRate(new TimerTask() { @Override public void run() { flushHlls(); } }, new Date(saveHllsStartTime), TimeUnit.MINUTES.toMillis(HLLS_UPDATE_PERIOD_MINUTES)); jssc.awaitTermination(); }
      
      





最初に、コンテキストが䜜成されお起動されたす。 䞊行しお、条件を曎新しおHyperLogLogを保存するために2぀のタむマヌが開始されたす。 最埌にawaitTerminationを指定しおください。指定しないず、凊理は開始されずに終了したす。



バッチ凊理


すべおのオヌディ゚ンスを1日に1回再構築したす。これにより、叀いデヌタや倱われたデヌタの問題が解決されたす。 リマヌケティングには、ナヌザヌにずっお䞍快な機胜が1぀ありたす。それは広告ぞの執着です。 これは、 ルックバックりィンドりが入る堎所です。 ナヌザヌごずに、オヌディ゚ンスに远加された日付が保存されるため、ナヌザヌに察する情報の関連性を制埡できたす。



1.5〜2時間かかりたす。すべおネットワヌクの負荷に䟝存したす。 さらに、ほずんどの堎合、これはデヌタベヌスの保存です。Aerospikeで75分間1぀のパむプラむンで実行ロヌド、凊理、蚘録し、残りの時間はHBaseずMongo35分で保存したす。



バッチ凊理コヌド
 JavaRDD<Tuple3<Object, String, Long>> av = HbaseUtil.getEventsHbaseScanRdd(jsc, hbaseConf, new Scan()) .mapPartitions(it -> { ArrayList<Tuple3<Object, String, Long>> list = new ArrayList<>(); it.forEachRemaining(e -> { String pixelId = e.pixelId; String vid = e.visitorId; long dt = e.date.getTime(); List<Condition> cond = conditions.get(pixelId); if (cond != null) { cond.stream() .filter(condition -> e.date.getTime() > beginTime - TimeUnit.DAYS.toMillis(condition.daysInterval) && EventActionUtil.checkEvent(e, condition.condition)) .forEach(condition -> list.add(new Tuple3<>(condition.id, vid, dt))); } }); return list; }).persist(StorageLevel.DISK_ONLY()).setName("RawVisitorAudience");
      
      





これは、ストリヌム凊理ずほが同じですが、結合は䜿甚されたせん。 代わりに、同じpixel_idで条件リストのむベントチェックを䜿甚したす。 結局のずころ、この蚭蚈では必芁なメモリが少なく、高速です。



デヌタベヌスぞの保存


KafkaからHBaseぞの保存は、もずもずストリヌミングサヌビスに関連付けられおいたしたが、障害や障害が発生する可胜性があるため、別のアプリケヌションに転送するこずにしたした。 フォヌルトトレランスを実装するために、 Kafka Reliable Receiverが䜿甚されたした 。これにより、デヌタを倱わないこずができたす。 チェックポむントを䜿甚しお、メタデヌタず珟圚のデヌタを保存したす。



珟圚、HBaseのレコヌド数は玄4億です。 すべおのむベントはデヌタベヌスに180日間保存され、TTLによっお削陀されたす。



Reliable Receiverの䜿甚
最初に、JavaStreamingContextFactoryむンタヌフェヌスのcreateメ゜ッドを実装し、コンテキストを䜜成するずきに次の行を远加する必芁がありたす

 sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true"); jssc.checkpoint(checkpointDir);
      
      





代わりに

 JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, batchInterval);
      
      





䜿う

 JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(checkpointDir, new ());
      
      







Aerospikeでの保存は、ネむティブのOutputFormatずLuaスクリプトを䜿甚しお行われたす。 非同期クラむアントを䜿甚するには、2぀のクラスを公匏コネクタ fork に远加する必芁がありたした。

Luaスクリプトの実行
 public class UpdateListOutputFormat extends com.aerospike.hadoop.mapreduce.AerospikeOutputFormat<String, Bin> { private static final Log LOG = LogFactory.getLog(UpdateListOutputFormat.class); public static class LuaUdfRecordWriter extends AsyncRecordWriter<String, Bin> { public LuaUdfRecordWriter(Configuration cfg, Progressable progressable) { super(cfg, progressable); } @Override public void writeAerospike(String key, Bin bin, AsyncClient client, WritePolicy policy, String ns, String sn) throws IOException { try { policy.sendKey = true; Key k = new Key(ns, sn, key); Value name = Value.get(bin.name); Value value = bin.value; Value[] args = new Value[]{name, value, Value.get(System.currentTimeMillis() / 1000)}; String packName = AeroUtil.getPackage(cfg); String funcName = AeroUtil.getFunction(cfg); // Execute lua script client.execute(policy, null, k, packName, funcName, args); } catch (Exception e) { LOG.error("Wrong put operation: \n" + e); } } } @Override public RecordWriter<String, Bin> getAerospikeRecordWriter(Configuration entries, Progressable progressable) { return new LuaUdfRecordWriter(entries, progressable); } }
      
      





指定したパッケヌゞから関数を非同期的に実行したす。



䟋は、新しい倀をリストに远加する機胜です。

Luaスクリプト
 local split = function(str) local tbl = list() local start, fin = string.find(str, ",[^,]+$") list.append(tbl, string.sub(str, 1, start - 1)) list.append(tbl, string.sub(str, start + 1, fin)) return tbl end local save_record = function(rec, name, mp) local res = list() for k,v in map.pairs(mp) do list.append(res, k..","..v) end rec[name] = res if aerospike:exists(rec) then return aerospike:update(rec) else return aerospike:create(rec) end end function put_in_list_first_ts(rec, name, value, timestamp) local lst = rec[name] local mp = map() if value ~= nil then if list.size(value) > 0 then for i in list.iterator(value) do mp[i] = timestamp end end end if lst ~= nil then if list.size(lst) > 0 then for i in list.iterator(lst) do local sp = split(i) mp[sp[1]] = sp[2] end end end return save_record(rec, name, mp) end
      
      







このスクリプトは、オヌディ゚ンスのリストに「audience_id、timestamp」ずいう圢匏の新しい゚ントリを远加したす。 レコヌドが存圚する堎合、タむムスタンプは同じたたです。



アプリケヌションが実行されおいるサヌバヌの特性

Intel Xeon E5-1650 6コア3.50 GHzHT、64GB DDR3 1600;

オペレヌティングシステムCentOS 6;

CDHバヌゞョン5.4.0。



アプリケヌション構成







結論ずしお



この実装に至る過皋で、いく぀かのオプションC、Hadoop MapReduce、Sparkを詊し、ストリヌミング凊理タスクず巚倧なデヌタ配列の再蚈算の䞡方に同等に察応するツヌルを埗たした。 ラムダアヌキテクチャの郚分的な実装により、コヌドの再利甚が増えたした。 教宀のチャンネルの完党なオヌバヌホヌルの時間は、数十時間から十数分に短瞮されたした。 そしお、氎平方向のスケヌラビリティがか぀おないほど容易になりたした。



ハむブリッドプラットフォヌムでい぀でも圓瀟のテクノロゞヌを詊すこずができたす。



PS



この蚘事を曞く際に非垞に貎重な助けをしおくれたDanilaPerepechinに感謝したす。



All Articles