-ãããã¯
-ãµãã¹ã¯ã©ã€ããŒïŒã³ã³ã·ã¥ãŒããŒïŒ
-åºç瀟ïŒãããã¥ãŒãµãŒïŒ
-ã°ã«ãŒããã°ã«ãŒããããŒãã£ã·ã§ã³
-ã¹ããªãŒã
ã«ãã«-åºæ¬
Kafkaãå匷ãããšãã質åã䜿ã£ãŠãäŸã䜿çšããŠå®éšçã«åãåãå¿ èŠãããçãããããŸãããããã¯ãã®èŠçŽã§æŠèª¬ãããŠããŸãã éå§æ¹æ³ãšéå§å Žæ以äžã®ãªã³ã¯ã®ãããããè³æã«èšèŒããŸãã
Apache Kafkaã¯ãJavaãã©ãããã©ãŒã ã®ã¡ãã»ãŒãžãããŒãžã£ãŒã§ãã Kafkaã«ã¯ã ãããªãã·ã£ãŒãã¡ãã»ãŒãžãæžã蟌ãã¡ãã»ãŒãžãµããžã§ã¯ããããããããã®ã¡ãã»ãŒãžãèªããããã¯ã«ãµãã¹ã¯ã©ã€ããŒãããŸãããã£ã¹ãããããã»ã¹ã®ãã¹ãŠã®ã¡ãã»ãŒãžã¯ãã£ã¹ã¯ã«æžã蟌ãŸããã³ã³ã·ã¥ãŒããŒã«äŸåããŸããã
Kafkaã«ã¯ãããŒããã»ã¯ã·ã§ã³ãæ¢è£œã®ãããªãã·ã£ãŒãç»é²è ãªã©ãäœæããããã®ãŠãŒãã£ãªãã£ã»ãããå«ãŸããŠããŸããKafkaãåäœãããã«ã¯ãZooKeeperã³ãŒãã£ããŒã¿ãŒãå¿ èŠã§ãã ïŒãããããã¡ã€ã«ã¯ããããã®binãã©ã«ããŒã«ããããŠãŒãã£ãªãã£ããããŸãã
ã«å ¥ããŠãŒãã£ãªãã£ã§KafkaããŒããäœæããŸã
kafka-topics.bat --create --zookeeper localhostïŒ2181 --replication-factor 1 --partitions 1 --topic out-topicããã§ã¯ãzookeeperãµãŒããŒã瀺ããreplication-factorã¯ã¡ãã»ãŒãžãã°ã¬ããªã«ã®æ°ãpartitionsã¯ãããã¯å ã®ã»ã¯ã·ã§ã³ã®æ°ïŒä»¥äžãåç §ïŒããããã¯èªäœã¯ãã¢ãŠããããã¯ãã§ãã
ç°¡åãªãã¹ãã®ããã«ãä»å±ã®kafka-console-consumerããã³kafka-console-producerã¢ããªã±ãŒã·ã§ã³ã䜿çšã§ããŸãããç§ã¯èªåã§ããã€ããã§ãã ãµãã¹ã¯ã©ã€ããŒã¯å®éã«ã¯ã°ã«ãŒãåãããŠããŸããããã«ãããããŸããŸãªã¢ããªã±ãŒã·ã§ã³ããããã¯ããã®ã¡ãã»ãŒãžã䞊è¡ããŠèªã¿åãããšãã§ããŸãã
åã¢ããªã±ãŒã·ã§ã³ã¯ç¬èªã®ãã¥ãŒãæã¡ãããããæåŸã«èªã¿åãããã¡ãã»ãŒãžïŒãªãã»ããïŒã®ãã€ã³ã¿ãŒã移åããŸããããã¯ã³ããããšåŒã°ããŸãã ãã®ããããããªãã·ã£ãŒããããã¯ã«ã¡ãã»ãŒãžãéä¿¡ããå Žåãå®è¡äžãŸãã¯æ¥ç¶ãããšããã«ããã®ãããã¯ã®åä¿¡è ã«ããèªã¿åããä¿èšŒãããŸãã ããã«ãåããããã¯ããèªã¿åãããç°ãªãã°ã«ãŒãã«ããç°ãªãã¯ã©ã€ã¢ã³ãïŒclient.idïŒãããå Žåããããã¯äºãã«é¢ä¿ãªããæºåãã§ãããšãã«ã¡ãã»ãŒãžãåä¿¡ããŸãã
ãããã£ãŠãã¡ãã»ãŒãžã®ãã©ãã¯ãŒãšã1ã€ã®ãããã¯ããã®æ¶è²»è ã«ããç¬ç«ããèªæžãæ³åã§ããŸãã
ãããããããã¯å ã®ã¡ãã»ãŒãžãéä¿¡ããããããæ©ãå°çãå§ããå ŽåããããŸãã æ¶è²»è ã¯ããããããé·ãåŠçããŸãã ãããè¡ãããã«ããããã¯ã¯ã»ã¯ã·ã§ã³ïŒããŒãã£ã·ã§ã³ïŒãæäŸãããã®ãããã¯ã®1ã€ã®ã°ã«ãŒãã§ã³ã³ã·ã¥ãŒããŒãå®è¡ã§ããŸãã
ãã®åŸãè² è·åæ£ãè¡ããããããã¯ããã³ã°ã«ãŒãå ã®ãã¹ãŠã®ã¡ãã»ãŒãžã1ã€ã®ã³ã³ã·ã¥ãŒããŒãééããããã§ã¯ãããŸããã ãããŠãã¡ãã»ãŒãžãã»ã¯ã·ã§ã³ã«åé ããæ¹æ³ãéžæãããŸãã ããã€ãã®æŠç¥ããããŸãïŒã©ãŠã³ãããã³-ããã¯ãããŒã®ããã·ã¥å€ã«ããåå ããŸãã¯æžã蟌ãã»ã¯ã·ã§ã³çªå·ã®æ瀺çãªæ瀺ã§ãã ãã®å Žåã®ãµãã¹ã¯ã©ã€ãã¯ãã»ã¯ã·ã§ã³å šäœã«åçã«åæ£ãããŸãã ããšãã°ãã°ã«ãŒãå ã®ã°ã«ãŒãããããµãã¹ã¯ã©ã€ããŒãå€ãå Žåãã ãããã¡ãã»ãŒãžãåä¿¡ããŸããã ãã®ããã«ããŠãã¹ã±ãŒã©ããªãã£ãæ¹åããããã®ã»ã¯ã·ã§ã³ãäœæãããŸãã
ããšãã°ã1ã€ã®ã»ã¯ã·ã§ã³ã§ãããã¯ãäœæããåŸã2ã€ã®ã»ã¯ã·ã§ã³ã«å€æŽããŸããã
kafka-topics.bat --zookeeper localhostïŒ2181 --alter --topic out-topic --partitions 21ã€ã®ãããã¯ã§1ã€ã®ã°ã«ãŒãã«ãããªãã·ã£ãŒãš2ã€ã®ãµãã¹ã¯ã©ã€ããŒãèµ·åããŸããïŒJavaããã°ã©ã ã®äŸã以äžã«ç€ºããŸãïŒã ã°ã«ãŒãåãšã¯ã©ã€ã¢ã³ãIDãæ§æããå¿ èŠã¯ãããŸãããKafkaããããåŠçããŸãã
my_kafka_run.cmd com.home.SimpleProducer out-topicïŒãããªãã·ã£ãŒïŒãã¢ã®ãããªãã·ã£ãŒã§ããŒã®å ¥åãéå§ãããšãå€ã¯èª°ãååŸããããããããŸãã ãããã£ãŠãããšãã°ãããŒããã·ã¥ã®é åžæŠç¥ã«åŸã£ãŠãã¡ãã»ãŒãžmïŒ1ã¯ã¯ã©ã€ã¢ã³ãclient01ã«å°éããŸããã
my_kafka_run.cmd com.home.SimpleConsumer out-topic testGroup01 client01ïŒæåã®ãµãã¹ã¯ã©ã€ããŒïŒ
my_kafka_run.cmd com.home.SimpleConsumer out-topic testGroup01 client02ïŒ2çªç®ã®ãµãã¹ã¯ã©ã€ããŒïŒ
ã¡ãã»ãŒãžnïŒã¯ã©ã€ã¢ã³ãclient02ãžã®1
ãã¢ãæå®ããã«keyïŒvalueã®ãã¢ãå ¥åãå§ãããšïŒãããªãã·ã£ãŒã§ãã®æ©äŒãäœããŸããïŒããµãŒã¯ã«æŠç¥ãéžæãããŸãã æåã®ã¡ãã»ãŒãžãmãã¯client01ã«ããããããã§ã«3åclient02ã«ãããããŠããŸãã
ãããŠãäŸãã°ããã®åœ¢åŒã®ããŒã®ãããªã»ã¯ã·ã§ã³ãæã€å¥ã®ãªãã·ã§ã³ïŒå€ïŒããŒãã£ã·ã§ã³
以åãããã·ã¥æŠç¥ã§ã¯ãmïŒ1ã¯å¥ã®ã¯ã©ã€ã¢ã³ãïŒclient01ïŒã«ç§»åããçŸåšã¯ã»ã¯ã·ã§ã³ïŒNo. 1ã0ããçªå·ä»ãïŒã®æ瀺çãªæ瀺ã§client02ã«ç§»åããŠããŸãã
åããããã¯ã«å¯ŸããŠç°ãªãã°ã«ãŒãåtestGroup02ã§ãµãã¹ã¯ã©ã€ããŒãèµ·åãããšãã¡ãã»ãŒãžã¯ãµãã¹ã¯ã©ã€ããŒã«äžŠè¡ããŠç¬ç«ããŠéä¿¡ãããŸãã æåã®ãã®ãèªã¿ã2çªç®ã®ãã®ãã¢ã¯ãã£ãã§ãªãã£ãå Žåãã¢ã¯ãã£ãã«ãªããšããã«èªã¿ãŸãã
ããããã°ã«ãŒãããããã¯ã®èª¬æãèŠãããšãã§ããŸãïŒ
kafka-consumer-groups.bat --bootstrap-server localhostïŒ9092 --describe --group testGroup01
kafka-topics.bat --describe --zookeeper localhostïŒ2181 --topic out-topic
SimpleProducerã³ãŒã
public class SimpleProducer { public static void main(String[] args) throws Exception { // Check arguments length value if (args.length == 0) { System.out.println("Enter topic name"); return; } //Assign topicName to string variable String topicName = args[0].toString(); System.out.println("Producer topic=" + topicName); // create instance for properties to access producer configs Properties props = new Properties(); //Assign localhost id props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); //Set acknowledgements for producer requests. props.put("acks", "all"); //If the request fails, the producer can automatically retry, props.put("retries", 0); //Specify buffer size in config props.put("batch.size", 16384); //Reduce the no of requests less than 0 props.put("linger.ms", 1); //The buffer.memory controls the total amount of memory available to the producer for buffering. props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer(props); BufferedReader br = null; br = new BufferedReader(new InputStreamReader(System.in)); System.out.println("Enter key:value, q - Exit"); while (true) { String input = br.readLine(); String[] split = input.split(":"); if ("q".equals(input)) { producer.close(); System.out.println("Exit!"); System.exit(0); } else { switch (split.length) { case 1: // strategy by round producer.send(new ProducerRecord(topicName, split[0])); break; case 2: // strategy by hash producer.send(new ProducerRecord(topicName, split[0], split[1])); break; case 3: // strategy by partition producer.send(new ProducerRecord(topicName, Integer.valueOf(split[2]), split[0], split[1])); break; default: System.out.println("Enter key:value, q - Exit"); } } } } }
SimpleConsumerã³ãŒã
public class SimpleConsumer { public static void main(String[] args) throws Exception { if (args.length != 3) { System.out.println("Enter topic name, groupId, clientId"); return; } //Kafka consumer configuration settings final String topicName = args[0].toString(); final String groupId = args[1].toString(); final String clientId = args[2].toString(); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", groupId); props.put("client.id", clientId); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); //props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); //props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); //Kafka Consumer subscribes list of topics here. consumer.subscribe(Arrays.asList(topicName)); //print the topic name System.out.println("Subscribed to topic=" + topicName + ", group=" + groupId + ", clientId=" + clientId); SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); // looping until ctrl-c while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) // print the offset,key and value for the consumer records. System.out.printf("offset = %d, key = %s, value = %s, time = %s \n", record.offset(), record.key(), record.value(), sdf.format(new Date())); } } }
ããã°ã©ã ãå®è¡ããããã«ãmy_kafka_run.cmdãšããããããã¡ã€ã«ãäœæããŸãã
@echo off set CLASSPATH="C:\Project\myKafka\target\classes"; for %%i in (C:\kafka_2.11-1.1.0\libs\*) do ( call :concat "%%i" ) set COMMAND=java -classpath %CLASSPATH% %* %COMMAND% :concat IF not defined CLASSPATH ( set CLASSPATH="%~1" ) ELSE ( set CLASSPATH=%CLASSPATH%;"%~1" )
èµ·åäŸïŒ
my_kafka_run.cmd com.home.SimpleConsumer out-topic testGroup02 client01
ã«ãã«ã¹ããªãŒã
ãã®ãããKafkaã®ãããŒã¯ãç¹å®ã®æäœãå€æãå®è¡ããåŸãããšãã°ãå¥ã®ãããã¯ã«çµæãè¿œå ããããäžè¬çã«ã©ãã«ã§ãããŒã¿ããŒã¹ã«ä¿åãããã§ãããããã¯ããååŸãããäžé£ã®ã€ãã³ãã§ãã æäœã¯ããã£ã«ã¿ãªã³ã°ïŒãã£ã«ã¿ãŒïŒãå€æïŒãããïŒããŸãã¯éçŽïŒã«ãŠã³ããåèšãå¹³åïŒã®ãããªãã®ã§ãã ãããè¡ãããã«ã察å¿ããã¯ã©ã¹KStreamãKTableããããŸããKTableã¯ãæ°ããã¡ãã»ãŒãžããããã¯ã«å°çãããšåžžã«æŽæ°ãããçŸåšã®éèšå€ãæã€ããŒãã«ãšããŠè¡šãããšãã§ããŸãã ããã¯ã©ãã§ããïŒ
ããšãã°ãçºè¡è ã¯ã€ãã³ãïŒã¡ãã»ãŒãžïŒã®ä»¶åã«æžã蟌ã¿ãKafkaã¯ãã¹ãŠã®ã¡ãã»ãŒãžãã¡ãã»ãŒãžãã°ã«ä¿åããŸããã¡ãã»ãŒãžãã°ã«ã¯ãããšãã°7æ¥éã®ä¿æããªã·ãŒããããŸãã ããšãã°ãçžå Žå€æŽã€ãã³ãã¯ã¹ããªãŒã ã§ãããããå¹³åå€ãèŠã€ããŠããããžã£ãŒãã«ããå±¥æŽãååŸããŠå¹³åãèšç®ããã¹ããªãŒã ãäœæããŸããããã§ãããŒã¯æ ªåŒã§ãããå€ã¯å¹³åã§ãïŒããã¯æ¢ã«ã¹ããŒã¿ã¹ã®ããããŒãã«ã§ãïŒã ããã«ã¯æ©èœããããŸã-ãã£ã«ã¿ãªã³ã°ãªã©ã®æäœãšã¯ç°ãªããéèšæäœã¯ãã®ç¶æ ãä¿æããŸãã ãããã£ãŠããµããžã§ã¯ãã«æ°ããå°çããã¡ãã»ãŒãžïŒã€ãã³ãïŒã¯èšç®ã®å¯Ÿè±¡ãšãªããçµæã¯ä¿åïŒç¶æ ã¹ãã¢ïŒããããã®åŸãæ°ããå°çããã¡ãã»ãŒãžã¯ãžã£ãŒãã«ã«æžã蟌ãŸããStreamã¯ããããåŠçããæ¢ã«ä¿åãããç¶æ ã«å€æŽãè¿œå ããŸãã ãã£ã«ã¿ãªã³ã°æäœã§ã¯ãç¶æ ãä¿åããå¿ èŠã¯ãããŸããã ãããŠãããã§ããåºç瀟ã«é¢ä¿ãªããã¹ããªãŒã ããããè¡ããŸãã ããšãã°ããããªãã·ã£ãŒã¯ã¡ãã»ãŒãžãæžã蟌ã¿ãããã°ã©ã -ã¹ããªãŒã ã¯ãã®æç¹ã§ã¯æ©èœãããäœã倱ãããããã¹ãŠã®ã¡ãã»ãŒãžã¯ãã°ã«ä¿åãããããã°ã©ã ã¹ããªãŒã ãã¢ã¯ãã£ãã«ãªããšããã«ãèšç®ãè¡ããç¶æ ãä¿åããèªã¿åãã¡ãã»ãŒãžã®ãªãã»ãããå®è¡ããŸããããã¯èªã¿åãããŸãïŒãå°æ¥çã«ã¯è¿ãããŸãããããã«ããããã®ã¡ãã»ãŒãžã¯ãžã£ãŒãã«ïŒkafka-logsïŒãé¢ããŸãã ããã§ã¯ãæããã«ãäž»ãªããšã¯ãã°ïŒkafka-logsïŒãšãã®ã¹ãã¬ãŒãžããªã·ãŒããããèš±å¯ããŠããããšã§ãã ããã©ã«ãã§ã¯ãKafka Streamã¯ç¶æ ãRocksDBã«ä¿åããŸãã ã¡ãã»ãŒãžãã°ãšããã«é¢é£ãããã¹ãŠïŒãããã¯ããªãã»ãããã¹ã¬ãããã¯ã©ã€ã¢ã³ããªã©ïŒã¯ãæ§æãã¡ã€ã«ãconfig \ server.propertiesãã®ãã©ã¡ãŒã¿ãŒãlog.dirs = kafka-logsãã§æå®ããããã¹ã«æ²¿ã£ãŠé 眮ããããã°ã¹ãã¬ãŒãžããªã·ãŒã瀺ãããŸããLog.retention.hours = 48ãã ãã°ã®äŸ
ãŸããã¹ããªãŒã ç¶æ ãå«ãããŒã¿ããŒã¹ãžã®ãã¹ã¯ãã¢ããªã±ãŒã·ã§ã³ãã©ã¡ãŒã¿ã§æå®ãããŸã
config.putïŒStreamsConfig.STATE_DIR_CONFIGã "CïŒ/kafka_2.11-1.1.0/state"ïŒ;ç¶æ ã¯ãã¢ããªã±ãŒã·ã§ã³ID ããšã«åå¥ã«ä¿åãããŸãïŒ StreamsConfig.APPLICATION_ID_CONFIG ïŒã äŸ
次ã«ãStreamã®åäœã確èªããŸãããã ãµã³ãã«ããStreamã¢ããªã±ãŒã·ã§ã³ãæºåããŸãããããã®ã¢ããªã±ãŒã·ã§ã³ã¯ãåãåèªã®æ°ãšã¢ããªã±ãŒã·ã§ã³ããããªãã·ã£ãŒããµãã¹ã¯ã©ã€ããŒãèæ ®ããé ä¿¡ïŒå®éšçšã®æ¹è¯ãå ãããã®ïŒã§ãã ãããã¯å ã®ãããã¯ã«æžã蟌ã¿ãŸã
my_kafka_run.cmd com.home.SimpleProducerã®ãããã¯Streamã¢ããªã±ãŒã·ã§ã³ã¯ãã®ãããã¯ãèªãã§åäžã®åèªã®æ°ãã«ãŠã³ãããŸããç¶æ ãç¶æããã¢ãŠããããã¯ãå¥ã®ãããã¯ã«ãªãã€ã¬ã¯ãããããšã¯æ瀺çã§ã¯ãããŸããã ããã§ããã°ãšç¶æ ïŒç¶æ ã¹ãã¢ïŒã®é¢ä¿ãæ確ã«ããŸãã ãããŠãZooKeeperãšKafkaãµãŒããŒãéå§ãããŸãã App-ID = app_01ã§ã¹ããªãŒã ãéå§ããŸã
my_kafka_run.cmd com.home.KafkaCountStreamã€ã³ãããã¯app_01ãããªãã·ã£ãŒãšãµãã¹ã¯ã©ã€ããŒããããã
my_kafka_run.cmd com.home.SimpleProducerã®ãããã¯ããã«ãããŸãïŒ
my_kafka_run.cmd com.home.SimpleConsumer out-topic testGroup01 client01
åèªã®å ¥åãéå§ãããã®ã«ãŠã³ãã確èªããŸããã©ã®Stream App-IDãããããã«ãŠã³ããããã瀺ããŸã
äœæ¥ã¯ç¬ç«ããŠè¡ãããŸããã¹ããªãŒã ãåæ¢ããŠãããã¯ãžã®æžã蟌ã¿ãç¶ãããšãéå§æã«ã«ãŠã³ããããŸãã 次ã«ãApp-ID = app_02ïŒãããã¢ããªã±ãŒã·ã§ã³ã§ãããIDãç°ãªããŸãïŒã§2çªç®ã®ã¹ããªãŒã ãæ¥ç¶ãããã°ïŒä¿æããªã·ãŒã«åŸã£ãŠä¿åãããäžé£ã®ã€ãã³ãïŒãèªã¿åããæ°ãã«ãŠã³ãããç¶æ ãä¿åããŠçµæã衚瀺ããŸãã ãããã£ãŠãç°ãªãæç¹ã§åäœãéå§ãã2ã€ã®ã¹ã¬ããã¯åãçµæã«ãªããŸããã
ä»ãç§ãã¡ã®ãžã£ãŒãã«ãå€ããªã£ãŠããããšãèããŠã¿ãŸãããïŒä¿æããªã·ãŒïŒãŸãã¯ãããåé€ãïŒå®è¡ã§ããŸãïŒãApp-ID = app_03ã§3çªç®ã®ã¹ããªãŒã ãæ¥ç¶ããŸãïŒããã®ããã«Kafkaãåæ¢ããkafka-logsãåé€ããŠåã³éå§ããŸãïŒãããŠæ°ãããããã¯ãå ¥åããŸãæåã®ïŒapp_01ïŒã¹ã¬ãããã«ãŠã³ããç¶ããæ°ãã3çªç®ã®ã¹ã¬ããããŒãããéå§ãããããšã確èªããŠãã ããã
ãã®åŸãapp_02ã¹ããªãŒã ãå®è¡ãããšãæåã®ã¹ããªãŒã ã«è¿œãã€ããå€ãçãããªããŸãã ãã®äŸãããKafkaãçŸåšã®ãã°ãåŠçããæ¹æ³ã以åã«ä¿åããç¶æ ã«è¿œå ããæ¹æ³ãªã©ãæããã«ãªããŸããã
KafkaCountStreamã³ãŒã
public class KafkaCountStream { public static void main(final String[] args) throws Exception { // Check arguments length value if (args.length != 2) { System.out.println("Enter topic name, appId"); return; } String topicName = args[0]; String appId = args[1]; System.out.println("Count stream topic=" + topicName +", app=" + appId); Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 2000); config.put(StreamsConfig.STATE_DIR_CONFIG, "C:/kafka_2.11-1.1.0/state"); StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> textLines = builder.stream(topicName); // State store KTable<String, Long> wordCounts = textLines .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+"))) .groupBy((key, word) -> word) .count(); // out to another topic KStream<String, String> stringKStream = wordCounts.toStream() .map((k, v) -> new KeyValue<>(appId + "." + k, v.toString())); stringKStream.to("out-topic", Produced.with(Serdes.String(), Serdes.String())); KafkaStreams streams = new KafkaStreams(builder.build(), config); // additional to complete the work final CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { System.out.println("Kafka Stream close"); streams.close(); latch.countDown(); } }); try { System.out.println("Kafka Stream start"); streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.out.println("Kafka Stream exit"); System.exit(0); } }
ã«ãã«ã®ããŒãã¯éåžžã«åºç¯ã§ãããç§ã¯èªåèªèº«ã®ããã«æåã®äžè¬çãªãã¬ãŒã³ããŒã·ã§ã³ãè¡ããŸãã:-)
ææïŒ
éå§æ¹æ³ãšéå§å Žæ