ä»åã¯ããã©ã¯ãã£ã¹ã§Apache Sparkã䜿çšããæ¹æ³ãšããªããŒã±ãã£ã³ã°å¯Ÿè±¡ãŠãŒã¶ãŒãäœæã§ããããŒã«ã«ã€ããŠèª¬æããŸãã
ãã®ããŒã«ã®ãããã§ããžã°ãœãŒããºã«ãèŠããšãã€ã³ã¿ãŒãããã®é ã ãŸã§ãžã°ãœãŒããºã«ãèŠãããšãã§ããŸãã
ããã§ãApache Sparkã®åŠçã§æåã®åé¡ãçºçããŸããã
ã¢ãŒããã¯ãã£ãšã¹ããŒã¯ã³ãŒããåæžãããŸããã
ã¯ããã«
ç®çãç解ããããã«ãçšèªãšãœãŒã¹ããŒã¿ã説æããŸãã
ãªããŒã±ãã£ã³ã°ãšã¯äœã§ããïŒ ãã®è³ªåã«å¯Ÿããçãã¯wikiã§èŠã€ãããŸã ïŒãèŠããã«ããªããŒã±ãã£ã³ã°ïŒå¥åãªã¿ãŒã²ãã£ã³ã°ïŒã¯ããŠãŒã¶ãŒãåºåäž»ã®ãµã€ãã«æ»ããŠã¿ãŒã²ããã¢ã¯ã·ã§ã³ãå®è¡ã§ããåºåã¡ã«ããºã ã§ãã
ãããè¡ãã«ã¯ãåºåäž»èªèº«ããã®ããŒã¿ ããããããã¡ãŒã¹ãããŒãã£ããŒã¿ãå¿ èŠã§ã ãããã¯ãã³ãŒããã€ã³ã¹ããŒã«ãããµã€ãSmartPixelããèªåçã«åéãããŸãã ããã¯ããŠãŒã¶ãŒïŒãŠãŒã¶ãŒãšãŒãžã§ã³ãïŒãã¢ã¯ã»ã¹ããããŒãžãå®è¡ãããã¢ã¯ã·ã§ã³ã«é¢ããæ å ±ã§ãã 次ã«ãApache Sparkã䜿çšããŠãã®ããŒã¿ãåŠçããèŠèŽè ã«åºåã衚瀺ãããŸãã
解決ç
ã¡ãã£ãšããæŽå²
ããšããšã¯MapReduceã¿ã¹ã¯ã䜿çšããŠçŽç²ãªHadoopã§èšè¿°ããããšãèšç»ããŠããã®ã§ãæåããŸããã ãã ãããã®ã¿ã€ãã®ã¢ããªã±ãŒã·ã§ã³ãäœæããã«ã¯å€ãã®ã³ãŒããå¿ èŠã§ãããç解ãšãããã°ãéåžžã«å°é£ã§ãã
3ã€ã®ç°ãªãã¢ãããŒãã®äŸã«ã€ããŠã¯ãvisitor_idã«ãããaudience_idã°ã«ãŒãåã³ãŒãã瀺ããŸãã
MapReduceã®ã³ãŒãäŸïŒ
public static class Map extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String s = value.toString(); String[] split = s.split(" "); context.write(new Text(split[0]), new Text(split[1])); } } public static class Reduce extends Reducer<Text, Text, Text, ArrayWritable> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { HashSet<Text> set = new HashSet<>(); values.forEach(t -> set.add(t)); ArrayWritable array = new ArrayWritable(Text.class); array.set(set.toArray(new Text[set.size()])); context.write(key, array); } } public static class Run { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(); job.setJarByClass(Run.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(ArrayWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
ããããããã¿ãèŠãŸããã MapReduceã¿ã¹ã¯ã®ã³ãŒãã解éããPig Latinã«åºã¥ãèšèªã æžãã®ã«å¿ èŠãªã³ãŒãã¯ã¯ããã«å°ãªããªãã審çŸçãªèŠ³ç¹ããã¯ã¯ããã«åªããŠããŸããã
è±ã®ã³ãŒãäŸïŒ
A = LOAD '/data/input' USING PigStorage(' ') AS (visitor_id:chararray, audience_id:chararray); B = DISTINCT A; C = GROUP B BY visitor_id; D = FOREACH C GENERATE group AS visitor_id, B.audience_id AS audience_id; STORE D INTO '/data/output' USING PigStorage();
ããããä¿åã«åé¡ããããŸããã ä¿åã®ããã«ç¬èªã®ã¢ãžã¥ãŒã«ãäœæããå¿ èŠããããŸããã ã»ãšãã©ã®ããŒã¿ããŒã¹éçºè ã¯PigããµããŒãããŠããŸããã
ããã§ãSparkãå©ãã«ãªããŸããã
Sparkã³ãŒãã®äŸïŒ
SparkConf sparkConf = new SparkConf().setAppName("Test"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); jsc.textFile(args[0]) .mapToPair(str -> { String[] split = str.split(" "); return new Tuple2<>(split[0], split[1]); }) .distinct() .groupByKey() .saveAsTextFile(args[1]);
ããã§ã¯ãç°¡æœããšå©äŸ¿æ§ã®äž¡æ¹ã«å ããŠãããŒã¿ããŒã¹ãžã®æžã蟌ã¿ã容æã«ããå€ãã®OutputFormatãååšããŸãã ããã«ããã®ããŒã«ã§ã¯ãã¹ããªãŒãã³ã°åŠçã®å¯èœæ§ã«é¢å¿ããããŸããã
çŸåšã®å®è£
ããã»ã¹å šäœã¯æ¬¡ã®ãšããã§ãã
ãµã€ãã«ã€ã³ã¹ããŒã«ãããSmartPixelsããããŒã¿ãå±ããŸãã ã³ãŒãã¯æäŸããŸãããããã¯éåžžã«ã·ã³ãã«ã§ãå€éšã®ã¡ããªãã¯ã«äŒŒãŠããŸãã ãããããããŒã¿ã¯{Visitor_IdïŒAction}ã®åœ¢åŒã«ãªããŸãã ããã§ãã¢ã¯ã·ã§ã³ãšã¯ãããŒãž/補åã®è¡šç€ºããã¹ã±ãããžã®è¿œå ãè³Œå ¥ããŸãã¯åºåäž»ãèšå®ããã«ã¹ã¿ã ã¢ã¯ã·ã§ã³ãªã©ãã¿ãŒã²ãããçµã£ãã¢ã¯ã·ã§ã³ãšããŠç解ã§ããŸãã
ãªããŒã±ãã£ã³ã°åŠçã¯ã2ã€ã®äž»èŠãªã¢ãžã¥ãŒã«ã§æ§æãããŠããŸãã
- ã¹ããªãŒã åŠçïŒ streaming ïŒã
- ãããåŠç
ã¹ããªãŒã åŠç
ãŠãŒã¶ãŒããªã¢ã«ã¿ã€ã ã§ãªãŒãã£ãšã³ã¹ã«è¿œå ã§ããŸãã 10ç§ã®åŠçééã§Spark Streamingã䜿çšããŸãã ãŠãŒã¶ãŒã¯ãå®äºããã¢ã¯ã·ã§ã³ã®ã»ãŒçŽåŸïŒãããã®10ç§ä»¥å ïŒã«èŠèŽè ã«è¿œå ãããŸãã ã¹ããªãŒãã³ã°ã¢ãŒãã§ã¯ãããŒã¿ããŒã¹ãžã®pingãŸãã¯ãã®ä»ã®çç±ã«ãããå°éã®ããŒã¿ãèš±å¯ãããããšã«æ³šæããããšãéèŠã§ãã
äž»ãªãã®ã¯ãå¿çæéãšåž¯åå¹ ã®ãã©ã³ã¹ã§ãã batchIntervalãå°ããã»ã©ãããŒã¿ã¯é«éã«åŠçãããŸãããæ¥ç¶ããã³ãã®ä»ã®ãªãŒããŒãããã®åæåã«å€ãã®æéãè²»ãããããããäžåºŠã«å€ãã®åŠçãè¡ãããšã¯ã§ããŸããã äžæ¹ãééãé·ããããšãäžåºŠã«å€§éã®ããŒã¿ãåŠçã§ããŸãããã¢ã¯ã·ã§ã³ã®ç¬éããç®çã®èŠèŽè ã«è¿œå ããããŸã§ããã貎éãªæéãç¡é§ã«ãªããŸãã
ã«ãã«ããã®ã€ãã³ãã®éžæïŒ
ã¡ãã»ãŒãžãããŒãäœæããã«ã¯ãã³ã³ããã¹ããå¿ èŠãªãããã¯ãããã³åä¿¡è ã°ã«ãŒãã®ååïŒãããããjsscãtopicsãããã³groupIdïŒãæž¡ãå¿ èŠããããŸãã ã°ã«ãŒãããšã«ããããã¯ããšã«ã¡ãã»ãŒãžãã¥ãŒã®ç¬èªã®ã·ããã圢æãããŸãã ãµãŒããŒéã®è² è·åæ£ã®ããã«è€æ°ã®åä¿¡è ãäœæããããšãã§ããŸãã ããŒã¿ã®ãã¹ãŠã®å€æã¯ãtransformFunctionã§æå®ãããã¬ã·ãŒããŒãšåãã¹ããªãŒã ã§å®è¡ãããŸãã
public class StreamUtil { private static final Function<JavaPairRDD<String, byte[]>, JavaRDD<Event>> eventTransformFunction = rdd -> rdd.map(t -> Event.parseFromMsgPack(t._2())).filter(e -> e != null); public static JavaPairReceiverInputDStream<String, byte[]> createStream(JavaStreamingContext jsc, String groupId, Map<String, Integer> topics) { HashMap prop = new HashMap() {{ put("zookeeper.connect", BaseUtil.KAFKA_ZK_QUORUM); put("group.id", groupId); }}; return KafkaUtils.createStream(jsc, String.class, byte[].class, StringDecoder.class, DefaultDecoder.class, prop, topics, StorageLevel.MEMORY_ONLY_SER()); } public static JavaDStream<Event> getEventsStream(JavaStreamingContext jssc, String groupName, Map<String, Integer> map, int count) { return getStream(jssc, groupName, map, count, eventTransformFunction); } public static <T> JavaDStream<T> getStream(JavaStreamingContext jssc, String groupName, Map<String, Integer> map, Function<JavaPairRDD<String, byte[]>, JavaRDD<T>> transformFunction) { return createStream(jssc, groupName, map).transform(transformFunction); } public static <T> JavaDStream<T> getStream(JavaStreamingContext jssc, String groupName, Map<String, Integer> map, int count, Function<JavaPairRDD<String, byte[]>, JavaRDD<T>> transformFunction) { if (count < 2) return getStream(jssc, groupName, map, transformFunction); ArrayList<JavaDStream<T>> list = new ArrayList<>(); for (int i = 0; i < count; i++) { list.add(getStream(jssc, groupName, map, transformFunction)); } return jssc.union(list.get(0), list.subList(1, count)); } }
ã¡ãã»ãŒãžãããŒãäœæããã«ã¯ãã³ã³ããã¹ããå¿ èŠãªãããã¯ãããã³åä¿¡è ã°ã«ãŒãã®ååïŒãããããjsscãtopicsãããã³groupIdïŒãæž¡ãå¿ èŠããããŸãã ã°ã«ãŒãããšã«ããããã¯ããšã«ã¡ãã»ãŒãžãã¥ãŒã®ç¬èªã®ã·ããã圢æãããŸãã ãµãŒããŒéã®è² è·åæ£ã®ããã«è€æ°ã®åä¿¡è ãäœæããããšãã§ããŸãã ããŒã¿ã®ãã¹ãŠã®å€æã¯ãtransformFunctionã§æå®ãããã¬ã·ãŒããŒãšåãã¹ããªãŒã ã§å®è¡ãããŸãã
ã€ãã³ãåŠçïŒ
ã³ã³ããã¹ãäœæ
ããã§ã¯ã2ã€ã®ïŒã€ãã³ããšæ¡ä»¶ïŒRDDïŒResilient Distributed DatasetïŒãçµåããããã«ãpixel_idã«ããçµåã䜿çšãããŸãã ä¿åæ¹æ³ã¯åœç©ã§ãã ããã¯ãéä¿¡ãããã³ãŒãããªãããŒãããããã«è¡ãããŸãã ãã®ä»£ããã«ãããã€ãã®å€æãšä¿åãå¿ èŠã§ãã
æã¡äžã
æåã«ãã³ã³ããã¹ããäœæãããŠèµ·åãããŸãã 䞊è¡ããŠãæ¡ä»¶ãæŽæ°ããŠHyperLogLogãä¿åããããã«2ã€ã®ã¿ã€ããŒãéå§ãããŸãã æåŸã«awaitTerminationïŒïŒãæå®ããŠãã ãããæå®ããªããšãåŠçã¯éå§ãããã«çµäºããŸãã
public JavaPairRDD<String, Condition> conditions; private JavaStreamingContext jssc; private Map<Object, HyperLogLog> hlls; public JavaStreamingContext create() { sparkConf.setAppName("UniversalStreamingBuilder"); sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); sparkConf.set("spark.storage.memoryFraction", "0.125"); jssc = new JavaStreamingContext(sparkConf, batchInterval); HashMap map = new HashMap(); map.put(topicName, 1); // Kafka topic name and number partitions JavaDStream<Event> events = StreamUtil.getEventsStream(jssc, groupName, map, numReceivers).repartition(numWorkCores); updateConditions(); events.foreachRDD(ev -> { // Compute audiences JavaPairRDD<String, Object> rawva = conditions.join(ev.keyBy(t -> t.pixelId)) .mapToPair(t -> t._2()) .filter(t -> EventActionUtil.checkEvent(t._2(), t._1().condition)) .mapToPair(t -> new Tuple2<>(t._2().visitorId, t._1().id)) .distinct() .persist(StorageLevel.MEMORY_ONLY_SER()) .setName("RawVisitorAudience"); // Update HyperLogLog`s rawva.mapToPair(t -> t.swap()).groupByKey() .mapToPair(t -> { HyperLogLog hll = new HyperLogLog(); t._2().forEach(v -> hll.offer(v)); return new Tuple2<>(t._1(), hll); }).collectAsMap().forEach((k, v) -> hlls.merge(k, v, (h1, h2) -> HyperLogLog.merge(h1, h2))); // Save to Aerospike and HBase save(rawva); return null; }); return jssc; }
ããã§ã¯ã2ã€ã®ïŒã€ãã³ããšæ¡ä»¶ïŒRDDïŒResilient Distributed DatasetïŒãçµåããããã«ãpixel_idã«ããçµåã䜿çšãããŸãã ä¿åæ¹æ³ã¯åœç©ã§ãã ããã¯ãéä¿¡ãããã³ãŒãããªãããŒãããããã«è¡ãããŸãã ãã®ä»£ããã«ãããã€ãã®å€æãšä¿åãå¿ èŠã§ãã
æã¡äžã
public void run() { create(); jssc.start(); long millis = TimeUnit.MINUTES.toMillis(CONDITION_UPDATE_PERIOD_MINUTES); new Timer(true).schedule(new TimerTask() { @Override public void run() { updateConditions(); } }, millis, millis); new Timer(false).scheduleAtFixedRate(new TimerTask() { @Override public void run() { flushHlls(); } }, new Date(saveHllsStartTime), TimeUnit.MINUTES.toMillis(HLLS_UPDATE_PERIOD_MINUTES)); jssc.awaitTermination(); }
æåã«ãã³ã³ããã¹ããäœæãããŠèµ·åãããŸãã 䞊è¡ããŠãæ¡ä»¶ãæŽæ°ããŠHyperLogLogãä¿åããããã«2ã€ã®ã¿ã€ããŒãéå§ãããŸãã æåŸã«awaitTerminationïŒïŒãæå®ããŠãã ãããæå®ããªããšãåŠçã¯éå§ãããã«çµäºããŸãã
ãããåŠç
ãã¹ãŠã®ãªãŒãã£ãšã³ã¹ã1æ¥ã«1ååæ§ç¯ããŸããããã«ãããå€ãããŒã¿ã倱ãããããŒã¿ã®åé¡ã解決ãããŸãã ãªããŒã±ãã£ã³ã°ã«ã¯ããŠãŒã¶ãŒã«ãšã£ãŠäžå¿«ãªæ©èœã1ã€ãããŸããããã¯åºåãžã®å·çã§ãã ããã¯ã ã«ãã¯ããã¯ãŠã£ã³ããŠãå ¥ãå Žæã§ãã ãŠãŒã¶ãŒããšã«ããªãŒãã£ãšã³ã¹ã«è¿œå ãããæ¥ä»ãä¿åãããããããŠãŒã¶ãŒã«å¯Ÿããæ å ±ã®é¢é£æ§ãå¶åŸ¡ã§ããŸãã
1.5ã2æéããããŸãããã¹ãŠãããã¯ãŒã¯ã®è² è·ã«äŸåããŸãã ããã«ãã»ãšãã©ã®å Žåãããã¯ããŒã¿ããŒã¹ã®ä¿åã§ããAerospikeã§75åéïŒ1ã€ã®ãã€ãã©ã€ã³ã§å®è¡ïŒããŒããåŠçãèšé²ããæ®ãã®æéã¯HBaseãšMongoïŒ35åïŒã§ä¿åããŸãã
ãããåŠçã³ãŒãïŒ
ããã¯ãã¹ããªãŒã åŠçãšã»ãŒåãã§ãããçµåã¯äœ¿çšãããŸããã 代ããã«ãåãpixel_idã§æ¡ä»¶ãªã¹ãã®ã€ãã³ããã§ãã¯ã䜿çšããŸãã çµå±ã®ãšããããã®èšèšã§ã¯å¿ èŠãªã¡ã¢ãªãå°ãªããé«éã§ãã
JavaRDD<Tuple3<Object, String, Long>> av = HbaseUtil.getEventsHbaseScanRdd(jsc, hbaseConf, new Scan()) .mapPartitions(it -> { ArrayList<Tuple3<Object, String, Long>> list = new ArrayList<>(); it.forEachRemaining(e -> { String pixelId = e.pixelId; String vid = e.visitorId; long dt = e.date.getTime(); List<Condition> cond = conditions.get(pixelId); if (cond != null) { cond.stream() .filter(condition -> e.date.getTime() > beginTime - TimeUnit.DAYS.toMillis(condition.daysInterval) && EventActionUtil.checkEvent(e, condition.condition)) .forEach(condition -> list.add(new Tuple3<>(condition.id, vid, dt))); } }); return list; }).persist(StorageLevel.DISK_ONLY()).setName("RawVisitorAudience");
ããã¯ãã¹ããªãŒã åŠçãšã»ãŒåãã§ãããçµåã¯äœ¿çšãããŸããã 代ããã«ãåãpixel_idã§æ¡ä»¶ãªã¹ãã®ã€ãã³ããã§ãã¯ã䜿çšããŸãã çµå±ã®ãšããããã®èšèšã§ã¯å¿ èŠãªã¡ã¢ãªãå°ãªããé«éã§ãã
ããŒã¿ããŒã¹ãžã®ä¿å
KafkaããHBaseãžã®ä¿åã¯ãããšããšã¹ããªãŒãã³ã°ãµãŒãã¹ã«é¢é£ä»ããããŠããŸããããé害ãé害ãçºçããå¯èœæ§ããããããå¥ã®ã¢ããªã±ãŒã·ã§ã³ã«è»¢éããããšã«ããŸããã ãã©ãŒã«ããã¬ã©ã³ã¹ãå®è£ ããããã«ã Kafka Reliable Receiverã䜿çšãããŸãã ãããã«ãããããŒã¿ã倱ããªãããšãã§ããŸãã ãã§ãã¯ãã€ã³ãã䜿çšããŠãã¡ã¿ããŒã¿ãšçŸåšã®ããŒã¿ãä¿åããŸãã
çŸåšãHBaseã®ã¬ã³ãŒãæ°ã¯çŽ4åã§ãã ãã¹ãŠã®ã€ãã³ãã¯ããŒã¿ããŒã¹ã«180æ¥éä¿åãããTTLã«ãã£ãŠåé€ãããŸãã
Reliable Receiverã®äœ¿çšïŒ
æåã«ãJavaStreamingContextFactoryã€ã³ã¿ãŒãã§ãŒã¹ã®createïŒïŒã¡ãœãããå®è£
ããã³ã³ããã¹ããäœæãããšãã«æ¬¡ã®è¡ãè¿œå ããå¿
èŠããããŸã
代ããã«
䜿ã
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true"); jssc.checkpoint(checkpointDir);
代ããã«
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, batchInterval);
䜿ã
JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(checkpointDir, new ());
Aerospikeã§ã®ä¿åã¯ããã€ãã£ãã®OutputFormatãšLuaã¹ã¯ãªããã䜿çšããŠè¡ãããŸãã éåæã¯ã©ã€ã¢ã³ãã䜿çšããã«ã¯ã2ã€ã®ã¯ã©ã¹ãå ¬åŒã³ãã¯ã¿ïŒ fork ïŒã«è¿œå ããå¿ èŠããããŸããã
Luaã¹ã¯ãªããã®å®è¡ïŒ
æå®ããããã±ãŒãžããé¢æ°ãéåæçã«å®è¡ããŸãã
public class UpdateListOutputFormat extends com.aerospike.hadoop.mapreduce.AerospikeOutputFormat<String, Bin> { private static final Log LOG = LogFactory.getLog(UpdateListOutputFormat.class); public static class LuaUdfRecordWriter extends AsyncRecordWriter<String, Bin> { public LuaUdfRecordWriter(Configuration cfg, Progressable progressable) { super(cfg, progressable); } @Override public void writeAerospike(String key, Bin bin, AsyncClient client, WritePolicy policy, String ns, String sn) throws IOException { try { policy.sendKey = true; Key k = new Key(ns, sn, key); Value name = Value.get(bin.name); Value value = bin.value; Value[] args = new Value[]{name, value, Value.get(System.currentTimeMillis() / 1000)}; String packName = AeroUtil.getPackage(cfg); String funcName = AeroUtil.getFunction(cfg); // Execute lua script client.execute(policy, null, k, packName, funcName, args); } catch (Exception e) { LOG.error("Wrong put operation: \n" + e); } } } @Override public RecordWriter<String, Bin> getAerospikeRecordWriter(Configuration entries, Progressable progressable) { return new LuaUdfRecordWriter(entries, progressable); } }
æå®ããããã±ãŒãžããé¢æ°ãéåæçã«å®è¡ããŸãã
äŸã¯ãæ°ããå€ããªã¹ãã«è¿œå ããæ©èœã§ãã
Luaã¹ã¯ãªããïŒ
local split = function(str) local tbl = list() local start, fin = string.find(str, ",[^,]+$") list.append(tbl, string.sub(str, 1, start - 1)) list.append(tbl, string.sub(str, start + 1, fin)) return tbl end local save_record = function(rec, name, mp) local res = list() for k,v in map.pairs(mp) do list.append(res, k..","..v) end rec[name] = res if aerospike:exists(rec) then return aerospike:update(rec) else return aerospike:create(rec) end end function put_in_list_first_ts(rec, name, value, timestamp) local lst = rec[name] local mp = map() if value ~= nil then if list.size(value) > 0 then for i in list.iterator(value) do mp[i] = timestamp end end end if lst ~= nil then if list.size(lst) > 0 then for i in list.iterator(lst) do local sp = split(i) mp[sp[1]] = sp[2] end end end return save_record(rec, name, mp) end
ãã®ã¹ã¯ãªããã¯ããªãŒãã£ãšã³ã¹ã®ãªã¹ãã«ãaudience_idãtimestampããšãã圢åŒã®æ°ãããšã³ããªãè¿œå ããŸãã ã¬ã³ãŒããååšããå Žåãã¿ã€ã ã¹ã¿ã³ãã¯åããŸãŸã§ãã
ã¢ããªã±ãŒã·ã§ã³ãå®è¡ãããŠãããµãŒããŒã®ç¹æ§ïŒ
Intel Xeon E5-1650 6ã³ã¢3.50 GHzïŒHTïŒã64GB DDR3 1600;
ãªãã¬ãŒãã£ã³ã°ã·ã¹ãã CentOS 6;
CDHããŒãžã§ã³5.4.0ã
ã¢ããªã±ãŒã·ã§ã³æ§æïŒ
çµè«ãšããŠ
ãã®å®è£ ã«è³ãéçšã§ãããã€ãã®ãªãã·ã§ã³ïŒCïŒãHadoop MapReduceãSparkïŒãè©Šããã¹ããªãŒãã³ã°åŠçã¿ã¹ã¯ãšå·šå€§ãªããŒã¿é åã®åèšç®ã®äž¡æ¹ã«åçã«å¯Ÿå¿ããããŒã«ãåŸãŸããã ã©ã ãã¢ãŒããã¯ãã£ã®éšåçãªå®è£ ã«ãããã³ãŒãã®åå©çšãå¢ããŸããã æ宀ã®ãã£ã³ãã«ã®å®å šãªãªãŒããŒããŒã«ã®æéã¯ãæ°åæéããåæ°åã«ççž®ãããŸããã ãããŠãæ°Žå¹³æ¹åã®ã¹ã±ãŒã©ããªãã£ããã€ãŠãªãã»ã©å®¹æã«ãªããŸããã
ãã€ããªãããã©ãããã©ãŒã ã§ãã€ã§ãåœç€Ÿã®ãã¯ãããžãŒãè©Šãããšãã§ããŸãã
PS
ãã®èšäºãæžãéã«éåžžã«è²Žéãªå©ããããŠãããDanilaPerepechinã«æè¬ããŸãã