ãã®èšäºã¯äž»ã«ãApache Kafkaãã¹ããªãŒã åŠç[Stream Processing]ã«ç²ŸéããŠãã人ã«åœ¹ç«ã¡ãŸãã
ãã®èšäºã§ã¯ãããããããã·ãªãŒãºã®æåã®èšäºã§ãã¹ããªãŒãã³ã°åŠçãç¹ã«Apache Kafkaã§ã®ã¹ããªãŒã ãšããŒãã«ã®æŠå¿µã«ã€ããŠèª¬æããããšæããŸãã çŸåšããã³å°æ¥ã®åé¡ãããè¯ãããã€/ãŸãã¯ããéã解決ããã®ã«åœ¹ç«ã€ãããè¯ãçè«çç解ãšã¢ã€ãã¢ãããã°ããã®ã§ããã
å 容ïŒ
*åæ©
*ãã¬ãŒã³èšèªã®ã¹ããªãŒã ãšããŒãã«
*å³è§£ãããäŸ
*ã·ã³ãã«ãªèšèªã®ã«ãã«ã¹ããªãŒã ãšããŒãã«
* Kafka StreamsãKSQLãããã³Scalaã®åçç©ã®è©³çŽ°
*ããŒãã«ã¯å·šäººã®è©ã®äžã«ç«ã€ïŒã¹ããªãŒã äžïŒ
*ããŒã¿ããŒã¹ã®è£è¿ã
*çµè«
åæ©ããŸãã¯ãªãæ°ã«ããå¿ èŠããããŸããïŒ
ç§ã®æ¥ã ã®ä»äºã§ã¯ãå€ãã®Apache KafkaãŠãŒã¶ãŒãšã Kafka Streamsããã³KSQL ïŒKafkaã®ã¹ããªãŒãã³ã°SQLïŒãä»ããŠKafkaã®ã¹ããªãŒãã³ã°åŠçã«é¢äžããŠãããŠãŒã¶ãŒãšéä¿¡ããŸãã ã¹ããªãŒãã³ã°åŠçãKafkaã®äœ¿çšçµéšã®ãããŠãŒã¶ãŒãããã°ãOracleãMySQLãªã©ã®RDBMSã®äœ¿çšçµéšã®ãããŠãŒã¶ãŒãããŸããã
ãããã質åïŒãStreamsãšTablesã®éãã¯äœã§ããïŒããã®èšäºã§ã¯ãããæ·±ãç解ã§ããããã«ãçãïŒTL; DRïŒãšé·ãäž¡æ¹ã®çããæäŸããããšæããŸãã 以äžã®èª¬æã®äžéšã¯ãç解ãšæèšãåçŽåãããããè¥å¹²åçŽåãããŸãïŒããšãã°ãã¢ãã©ã¯ã·ã§ã³ã®ããåçŽãªãã¥ãŒãã³ã¢ãã«ã¯ãã»ãšãã©ã®æ¥åžžçãªç¶æ³ã«ååã§ããã幞ããªããšã«ãã¹ããªãŒã åŠç㯠ãã¢ã€ã³ã·ã¥ã¿ã€ã³ã®çžå¯Ÿè«çã¢ãã«ãšãŠãè€éã§ãïŒã
å¥ã®äžè¬çãªè³ªåïŒãè¯ããããããªããããç§ãæ©ãŸããå¿ èŠããããŸããïŒ ããã¯æ¥ã ã®ä»äºã§ã©ã®ããã«åœ¹ç«ã€ã®ã§ããããïŒãèŠããã«ãå€ãã®çç±ããã§ãïŒ ã¹ããªãŒãã³ã°åŠçã®äœ¿çšãéå§ãããšãå®éã«ã¯ã»ãšãã©ã®å Žåãã¹ããªãŒã ãšããŒãã«ã®äž¡æ¹ãå¿ èŠã§ããããšãããã«ããããŸãã åŸã§èª¬æããããã«ãè¡šã¯ç¶æ ãè¡šããŸãã çµå[ããšãã°ïŒããšãã°ããã¡ã¯ãã®ãããŒããã£ã¡ã³ã·ã§ã³ããŒãã«[ ãã£ã¡ã³ã·ã§ã³ããŒãã« ]ãšçµã¿åãããŠãªã¢ã«ã¿ã€ã ã§ããŒã¿ã匷å [ ããŒã¿ãšã³ãªããã¡ã³ã ] ïŒãŸãã¯éçŽ[éçŽ] ïŒããšãã°ããªã¢ã«ã¿ã€ã ã§5åéã§äž»èŠãªããžãã¹ã€ã³ãžã±ãŒã¿ã®å¹³åå€ãèšç®ããïŒããã®åŸãããŒãã«ã«ã¹ããªãŒãã³ã°ç»åãå°å ¥ãããŸãã ããã§ãªããã°ãããã¯ããªããèªåã§ãããããªããã°ãªããªãããšãæå³ããŸã[å€ãã®DIYã®èŠç] ã
ãããããã®ãšãªã¢ã®æåã®ãHello Worldãã§ããæªåé«ãWordCountã®äŸã§ããããç¶æ ãã«ããŽãªã«åé¡ãããŸããããã¯ãåèªã«ãŠã³ãã®ããã«é£ç¶çã«æŽæ°ãããããŒãã«/ãããã«è¡ã®ã¹ããªãŒã ãéçŽããç¶æ åŠçã®äŸã§ãã ãããã£ãŠãåçŽãªWordCountã¹ããªãŒãã³ã°ãå®è£ ããŠãããã äžæ£æ€åºãªã©ã®ããè€éãªãã®ãå®è£ ããŠãããã«é¢ä¿ãªããåºæ¬çãªããŒã¿æ§é ãšå éšã«å¿ èŠãªãã¹ãŠã®ãã®ã䜿çšããã¹ãââãªãŒã åŠçã®ããã®äœ¿ãããããœãªã¥ãŒã·ã§ã³ãå¿ èŠã§ãïŒãã³ãïŒã¹ããªãŒã ãšããŒãã«ïŒ ãã¡ãããCassandraãMySQLãªã©ã®ãªã¢ãŒãã¹ãã¬ãŒãžãšã¹ããªãŒãã³ã°åŠçãã¯ãããžãŒãçµã¿åãããŠïŒå¿ èŠã«å¿ããŠïŒãå¿ èŠã«å¿ããŠHadoop / HDFSãè¿œå ããŠãã©ãŒã«ããã¬ã©ã³ã¹åŠçãæäŸããè€éã§äžèŠãªã¢ãŒããã¯ãã£ãæ§ç¯ããå¿ èŠã¯ãããŸãã[ãã©ãŒã«ããã¬ã©ã³ã¹åŠç] ïŒ3ã€ã®ããšãå€ãããŸãïŒã
ãã¬ãŒã³èšèªã®ã¹ããªãŒã ãšããŒãã«
ããã«ç§ãæãã€ãæé«ã®äŸãããããŸãïŒ
- ã«ãã«ã®ã¹ããªãŒã ã¯ã æéã®åãããçŸåšãŸã§ã®äžçã§èµ·ãã£ããã¹ãŠã®ã€ãã³ãïŒãŸãã¯åã«ããžãã¹ã€ãã³ãïŒã®å®å šãªå±¥æŽã§ãã 圌ã¯éå»ãšçŸåšã代衚ããŠããŸãã ä»æ¥ããææ¥ã«ç§»è¡ããã«ã€ããŠãæ°ããã€ãã³ããäžçã®æŽå²ã«çµ¶ããè¿œå ãããŠããŸãã
- ã«ãã«ã®ããŒãã«ã¯ä»æ¥ã®äžçã®ç¶æ ã§ã ã 圌女ã¯çŸåšã代衚ããŠããŸãã ããã¯äžçã®ãã¹ãŠã®ã€ãã³ãã®éåäœã§ãã ãä»æ¥ããææ¥ãžãšç§»è¡ããã«ã€ããŠåžžã«å€åããŠããŸãã
ãŸããå°æ¥ã®æçš¿ã®é£åé ãšããŠïŒäžçïŒã¹ããªãŒã ïŒã®ã€ãã³ãã®å šå±¥æŽã«ã¢ã¯ã»ã¹ã§ããå Žåããã€ã§ãäžçã®ç¶æ ãã€ãŸãã¹ããªãŒã å ã®ä»»æã®æç¹
t
ããŒãã«ã埩å ã§ããŸããããã§ã
t
t=
ã èšãæããã°ãäžçã®ç¶æ ïŒããŒãã«ïŒã®ãã¹ãããã·ã§ãããããã€ã§ãäœæã§ããŸããããšãã°ãã®ã¶ã«å€§ãã©ãããã建èšãããçŽå å2560幎ããŸãã¯ãšãŒãããã®é£åã
å³è§£ãããäŸ
æåã®äŸã¯ããŠãŒã¶ãŒã®ãžãªãã±ãŒã·ã§ã³ãå«ãã¹ããªãŒã ã瀺ããŠããŸããããã¯ãåãŠãŒã¶ãŒã®çŸåšã®ïŒæåŸã®ïŒäœçœ®ãåºå®ããããŒãã«ã«éçŽãããŸã ã åŸã§èª¬æããŸãããKafka [ãããã¯]ãããã¯ãããŒãã«ã«çŽæ¥èªã¿èŸŒããšããããããŒãã«ã®ããã©ã«ãã®ã»ãã³ãã£ã¯ã¹ã«ãªããŸãã
2çªç®ã®ãŠãŒã¹ã±ãŒã¹ã¯ãåããŠãŒã¶ãŒãžãªãã±ãŒã·ã§ã³æŽæ°ã¹ããªãŒã ã瀺ããŠããŸãããã¹ããªãŒã ã¯ãåãŠãŒã¶ãŒã蚪ããå Žæã®æ°ãèšé²ããããŒãã«ã«éçŽãããŸã ã éèšé¢æ°ãç°ãªãããïŒããã§ã¯æ°éã®ã«ãŠã³ãïŒãããŒãã«ã®å 容ãç°ãªããŸãã ããæ£ç¢ºã«ã¯ãä»ã®ããŒå€ã
ç°¡åãªèšèªã®Kafkaã®ã¹ããªãŒã ãšããŒãã«
詳现ã説æããåã«ãç°¡åãªãã®ããå§ããŸãããã
Kafka ã®ãããã¯ã¯ãããŒãšå€ã®ãã¢ã®ç¡å¶éã®ã·ãŒã±ã³ã¹ã§ãã ããŒãšå€ã¯ãéåžžã®ãã€ãé åã§ãã
<byte[], byte[]>
ã
ã¹ããªãŒã ã¯ã [ã¹ããŒã]ã¹ããŒã ã䜿çšãããããã¯ã§ãã ããŒãšå€ã¯ãã€ãé åã§ã¯ãªããªããŸããããç¹å®ã®ã¿ã€ãããããŸãã
äŸïŒ<byte[], byte[]>
ãããã¯ã¯ããŠãŒã¶ãŒã®å°ççäœçœ®ã®<User, GeoLocation>
ã¹ããªãŒã ãšããŠèªã¿åããŸãã
ããŒãã«ãšã¯ãéåžžã®æå³ã§ã®ããŒãã«ã§ãïŒãã§ã«RDBMSã«ç²ŸéããŠããŠãKafkaã ããç¥ã£ãŠããçããã®åã³ãæããŸãïŒã ããããã¹ããªãŒã åŠçã®ããªãºã ãéããŠèŠããšãããŒãã«ãéçŽãããã¹ããªãŒã ã§ããããšãããããŸã ïŒãããŒãã«ã¯ããŒãã«ã§ããã®å®çŸ©ã§åæ¢ããããšãæ¬åœã«æåŸ ããŠããªãã£ãã§ãããïŒïŒã
äŸïŒãžãªããŒã¿ã®æŽæ°ã䌎ã<User, GeoLocation>
ã¹ããªãŒã ã¯ããŠãŒã¶ãŒã®æåŸã®äœçœ®ã远跡ãã<User, GeoLocation>
ããŒãã«ã«éçŽãããŸãã éèšæ®µéã§ãããŒãã«å ã®[UPSERT]å€ã¯ãå ¥åã¹ããªãŒã ã®ããŒã«ãã£ãŠæŽæ°ãããŸãã ããã¯ãäžèšã®æåã®äŸã§èŠãŸããã
äŸïŒ<User, GeoLocation>
ã¹ããªãŒã ã¯<User, Long>
ããŒãã«ã«éçŽãããåãŠãŒã¶ãŒã®èšªåæžã¿ãã±ãŒã·ã§ã³ã®æ°ã远跡ããŸãã éèšæ®µéã§ã¯ãããŒãã«å ã®ããŒã®å€ãç¶ç¶çã«èšç®ïŒããã³æŽæ°ïŒãããŸãã ããã¯ãäžèšã®2çªç®ã®äŸã§èŠãŸããã
åèšïŒ
ãããã¯ãã¹ããªãŒã ãããã³ããŒãã«ã«ã¯ãKafkaã§æ¬¡ã®ããããã£ããããŸãã
çš®é¡ | ããŒãã£ã·ã§ã³ããããŸã | å¶éãªã | 泚æããããŸã | å€æŽå¯èœ | ããŒã®äžææ§ | ã¹ããŒã |
---|---|---|---|---|---|---|
ããã㯠| ã¯ã | ã¯ã | ã¯ã | ãã | ãã | ãã |
ã¹ããªãŒã | ã¯ã | ã¯ã | ã¯ã | ãã | ãã | ã¯ã |
ããŒãã« | ã¯ã | ã¯ã | ãã | ã¯ã | ã¯ã | ã¯ã |
ãããã¯ãã¹ããªãŒã ãããã³ããŒãã«ãKafka Streams APIããã³KSQLã«ã©ã®ããã«é¢é£ããããèŠãŠã¿ãŸãããããŸããããã°ã©ãã³ã°èšèªãšã®é¡äŒŒæ§ãåŒãåºããŸãïŒããšãã°ããããã¯/ã¹ããªãŒã /ããŒãã«ãããŒãã£ã·ã§ã³åã§ãããªã©ã¯ç¡èŠããŸãïŒã
çš®é¡ | ã«ãã«ã¹ããªãŒã | KSQL | Java | ã¹ã«ã© | Python |
---|---|---|---|---|---|
ããã㯠| - | - | List / Stream
| List / Stream [(Array[Byte], Array[Byte])]
| []
|
ã¹ããªãŒã | KStream
| STREAM
| List / Stream
| List / Stream [(K, V)]
| []
|
ããŒãã« | KTable
| TABLE
| HashMap
| mutable.Map[K, V]
| {}
|
ãããããã®ã¬ãã«ã§ã®ãã®å±¥æŽæžã¯ãã»ãšãã©åœ¹ã«ç«ããªããããããŸããã ããã§ã¯ã詳ããèŠãŠã¿ãŸãããã
Kafka StreamsãKSQLãããã³Scalaã®åçç©ã®è©³çŽ°
以äžã®åã»ã¯ã·ã§ã³ã¯ãScalaïŒã¹ããªãŒãã³ã°åŠçãåããã·ã³ã§è¡ãããããšãæ³åããŠãã ããïŒãšScala REPLã®é¡æšããå§ããŠãã³ãŒããã³ããŒããŠèªåã§ãã¬ã€ã§ããããã«ããŸãã次ã«ãKafka StreamsãšKSQLã§åãããšãè¡ãæ¹æ³ã説æããŸãïŒåæ£ãã·ã³ã§ã®æè»ã§ã¹ã±ãŒã©ãã«ã§èé害æ§ã®ããã¹ããªãŒãã³ã°åŠçïŒã æåã«è¿°ã¹ãããã«ã以äžã®èª¬æãå°ãç°¡ç¥åããŸãã ããšãã°ãKafkaã§ã®ããŒãã£ã·ã§ã³åå²ã®åœ±é¿ã¯èæ ®ããŸããã
Scalaãç¥ããªãå ŽåïŒæ¥ããããããªãã§ãã ããïŒ Scalaã®é¡äŒŒç©ã詳现ã«ç解ããå¿ èŠã¯ãããŸããã ã©ã®æäœïŒããšãã°map()
ïŒãçµåãããŠããããããããäœã§ãããïŒããšãã°reduceLeft()
ãéçŽïŒãããã³ã¹ããªãŒã ã®ããã§ãŒã³ããšããŒãã«ã®ããã§ãŒã³ããšã®çžé¢é¢ä¿ã«æ³šæãæãã ãã§ååã§ãã
ãããã¯ã¹
Kafkaã®ãããã¯ã¯ãããŒãšå€ã®ã¡ãã»ãŒãžã§æ§æãããŠããŸãã ãã®ãããã¯ã¯ãã¡ãã»ãŒãžã®ã·ãªã¢ã«å圢åŒããã¿ã€ããã«äŸåããŸãããã¡ãã»ãŒãžã®ããŒãšå€ã¯ãéåžžã®
byte[]
é åã®ããã«æ±ãããŸãã èšãæããã°ããã®èŠ³ç¹ããã¯ãããŒã¿ã®äžã«äœãããã®ãââåãããŸããã
Kafka Streamsããã³KSQLã«ã¯ããããã¯ãã®æŠå¿µã¯ãããŸããã 圌ãã¯ã¹ããªãŒã ãšããŒãã«ã«ã€ããŠã®ã¿ç¥ã£ãŠããŸãã ãããã£ãŠãããã§ã¯Scalaã®ã¢ããã°ãããã¯ã®ã¿ã瀺ããŸãã
// Scala analogy scala> val topic: Seq[(Array[Byte], Array[Byte])] = Seq((Array(97, 108, 105, 99, 101),Array(80, 97, 114, 105, 115)), (Array(98, 111, 98),Array(83, 121, 100, 110, 101, 121)), (Array(97, 108, 105, 99, 101),Array(82, 111, 109, 101)), (Array(98, 111, 98),Array(76, 105, 109, 97)), (Array(97, 108, 105, 99, 101),Array(66, 101, 114, 108, 105, 110)))
ã¹ããªãŒã
ããã§ããããã¯ãã¹ããªãŒã ã«èªã¿èŸŒã¿ãã¹ããŒã ã«é¢ããæ å ±ãè¿œå ããŸãïŒã¹ããŒã [schema-on-read]ãèªã¿åããŸã ïŒã èšãæããã°ãçã®ãåä»ããããŠããªããããã¯ããåä»ããããã¯ããŸãã¯ã¹ããªãŒã ã«å€æããŸãã
èªã¿åãã¹ããŒã ãšæžã蟌ã¿ã¹ããŒã[schema-on-write] ïŒ Kafkaãšãã®ãããã¯ã¯ãããŒã¿ã®ã·ãªã¢ã«å圢åŒã«äŸåããŸããã ãããã£ãŠãã¹ããªãŒã ãŸãã¯ããŒãã«ã®ããŒã¿ãèªã¿åãå Žåã¯ã¹ããŒã ãæå®ããå¿ èŠããããŸãã ããã¯ãèªã¿åãã¹ããŒã ãšåŒã°ããŸãã èªã¿åãæ¹åŒã«ã¯ãé·æãšçæã®äž¡æ¹ããããŸãã 幞ããªããšã«ãã¢ããªã±ãŒã·ã§ã³ãšãµãŒãã¹ã§APIã³ã³ãã©ã¯ããå®çŸ©ããã®ãšåãããã«ãããŒã¿ã®ã³ã³ãã©ã¯ããå®çŸ©ããããšã§ãèªã¿åãã¹ããŒã ãšæžã蟌ã¿ã¹ããŒã ã®äžéãéžæã§ããŸãã ããã¯ã Apache Avroãªã©ã®æ§é åãããæ¡åŒµå¯èœãªããŒã¿åœ¢åŒãéžæãã Confluent Schema Registryãªã©ã®Avroã¹ããŒã çšã®ã¬ãžã¹ããªå±éã§å®çŸã§ããŸãã ãããŠãããèå³ãããã°ãKafka StreamsãšKSQLã¯AvroããµããŒãããŠããŸãã
Scalaã§ã¯ãããã¯ä»¥äžã®
map()
æäœã䜿çšããŠå®çŸãããŸãã ãã®äŸã§ã¯ããã¢
<String, String>
ããã¹ããªãŒã ãååŸããŸãã ããŒã¿ã®å éšã確èªããæ¹æ³ã«æ³šç®ããŠãã ããã
// Scala analogy scala> val stream = topic | .map { case (k: Array[Byte], v: Array[Byte]) => new String(k) -> new String(v) } // => stream: Seq[(String, String)] = // List((alice,Paris), (bob,Sydney), (alice,Rome), (bob,Lima), (alice,Berlin))
Kafka Streamsã§ã¯ã
StreamsBuilder#stream()
ä»ããŠ
StreamsBuilder#stream()
ãããã¯ãèªã¿ãŸãã ããã§ã¯ããããã¯ããããŒã¿ãèªã¿åããšãã«
Consumed.with()
ãã©ã¡ãŒã¿ãŒã䜿çšããŠç®çã®ã¹ããŒãã決å®ããå¿ èŠããããŸãã
StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> stream = builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()));
KSQLã§ã¯ã次ã®ãããªããšãããŠããããã¯ã
STREAM
ãšããŠèªãå¿ èŠããããŸãã ããã§ã¯ããããã¯ããããŒã¿ãèªã¿åããšãã«åã®ååãšã¿ã€ããæå®ããããšã«ãããç®çã®ã¹ããŒã ãå®çŸ©ããŸãã
CREATE STREAM myStream (username VARCHAR, location VARCHAR) WITH (KAFKA_TOPIC='input-topic', VALUE_FORMAT='...')
ããŒãã«
çŸåšãè¡šã®åããããã¯ãèªãã§ããŸãã ãŸããåç·ã«é¢ããæ å ±ãè¿œå ããå¿ èŠããããŸãïŒåç·ã®èªã¿åãïŒã 次ã«ãã¹ããªãŒã ãããŒãã«ã«å€æããå¿ èŠããããŸãã Kafkaã®ããŒãã«ã®ã»ãã³ãã£ã¯ã¹ã§ã¯ãæçµçãªããŒãã«ã§ã¯ããã®ããŒã®æåŸã®å€ã®ãããã¯ããã®åã¡ãã»ãŒãžããŒã衚瀺ããå¿ èŠããããšè¿°ã¹ãŠããŸãã
æåã®äŸã䜿çšããŠã¿ãŸããããããã§ã¯ããµããªãŒããŒãã«ãåãŠãŒã¶ãŒã®æåŸã®å Žæã远跡ããŸãã
Scalaã®å ŽåïŒ
// Scala analogy scala> val table = topic | .map { case (k: Array[Byte], v: Array[Byte]) => new String(k) -> new String(v) } | .groupBy(_._1) | .map { case (k, v) => (k, v.reduceLeft( (aggV, newV) => newV)._2) } // => table: scala.collection.immutable.Map[String,String] = // Map(alice -> Berlin, bob -> Lima)
ã¹ããŒã ã«é¢ããæ å ±ã®è¿œå ã¯ãäžèšã®ã¹ããªãŒã ã®äŸãšåæ§ã«ãæåã®
map()
æäœã䜿çšããŠå®çŸãããŸãã ã¹ããªãŒã ã¯ãéèšã¹ããã ïŒè©³çŽ°ã¯åŸè¿°ïŒã䜿çšããŠ[ã¹ããªãŒã ããããŒãã«]ããŒãã«ã«å€æãããŸããããã®å Žåã¯ããŒãã«ã®UPSERTæäœïŒç¶æ ãªãïŒã§ãããã¯
groupBy().map()
ã¹ããã
groupBy().map()
åããŒã®
reduceLeft()
æäœãå«ãŸããŸãã éçŽãšã¯ãããŒããšã«å€ãã®å€ã1ã€ã«å§çž®ããããšãæå³ããŸãã ãã®ç¹å®ã®
reduceLeft()
éçŽ-以åã®aggVå€ã¯ãæå®ãããããŒã®æ°ããå€ã®èšç®ã«äœ¿çšãããªãããšã«æ³šæããŠãã ããã
ã¹ããªãŒã ãšããŒãã«ã®é¢ä¿ã«ã€ããŠèå³æ·±ãã®ã¯ãäžèšã®ã³ãã³ãã以äžã®çããªãã·ã§ã³ã«çžåœããããŒãã«ãäœæããããšã§ãïŒ åç §ééæ§ãæãåºããŠãã ããïŒãã¹ããªãŒã ããçŽæ¥ããŒãã«ãæ§ç¯ããã¿ã€ã/ã¹ããŒã ã®æå®ãã¹ãããã§ããŸãã¹ããªãŒã ã¯æ¢ã«å ¥åãããŠããããã§ãã ããŒãã«ã¯ãã¹ããªãŒã ã®æŽŸç ãéçŽã§ããããšãããããŸã ã
// Scala analogy, simplified scala> val table = stream | .groupBy(_._1) | .map { case (k, v) => (k, v.reduceLeft( (aggV, newV) => newV)._2) } // => table: scala.collection.immutable.Map[String,String] = // Map(alice -> Berlin, bob -> Lima)
Kafka Streamsã§ã¯ãéåžž
StreamsBuilder#table()
ã䜿çš
KTable
åçŽãªã¯ã³ã©ã€ããŒã§
KTable
ã®Kafkaãããã¯ãèªã¿åããŸãã
KTable<String, String> table = builder.table("input-topic", Consumed.with(Serdes.String(), Serdes.String()));
ãã ããããããããããããã«ãæåã«
KStream
ã§ãããã¯ãèªãã§ãããäžèšã«ç€ºããã®ãšåãéèšæé ãå®è¡ããŠ
KStream
ããããšãã§ããŸãã
KStream<String, String> stream = ...; KTable<String, String> table = stream .groupByKey() .reduce((aggV, newV) -> newV);
KSQLã§ã¯ããããã¯ã
TABLE
ãšããŠèªãããã«æ¬¡ã®ãããªããšãè¡ãå¿ èŠããããŸãã ããã§ã¯ããããã¯ããèªã¿åããšãã«åã®ååãšã¿ã€ããæå®ããŠãç®çã®ã¹ããŒã ã決å®ããå¿ èŠããããŸãã
CREATE TABLE myTable (username VARCHAR, location VARCHAR) WITH (KAFKA_TOPIC='input-topic', KEY='username', VALUE_FORMAT='...')
ããã¯ã©ãããæå³ã§ããïŒ ããã¯ãæåã«è¿°ã¹ãããã«ãããŒãã«ãå®éã«éçŽãããã¹ããªãŒã ã§ããããšãæå³ããŸãã ããã¯ãããŒãã«ããããã¯ããçŽæ¥äœæããããšããäžèšã®ç¹å¥ãªã±ãŒã¹ã§çŽæ¥èŠãŸããã ãã ããããã¯å®éã«ã¯äžè¬çãªã±ãŒã¹ã§ãã
ããŒãã«ã¯å·šäººã®è©ã®äžã«ç«ã€ïŒã¹ããªãŒã äžïŒ
æŠå¿µçã«ã¯ãKafkaã®1次ããŒã¿æ§é ã¯ã¹ããªãŒã ã®ã¿ã§ãã äžæ¹ãããŒãã«ã¯ãïŒ1ïŒ ããŒããšã®éçŽãéããŠæ¢åã®ã¹ããªãŒã ãã掟çããããïŒ2ïŒåžžã«éçŽãããã¹ããªãŒã ã«æ¡åŒµãããæ¢åã®ã¹ããªãŒã ããåé€ãããŸãïŒæåŸã®ããŒãã«ããproto-streamsããšåŒã¶ããšãã§ããŸã[ ãUr-streamã] ïŒã
ããŒãã«ã¯ãã¹ããªãŒã ã®å ·äœåããããã¥ãŒãšããŠã説æãããŸãã ãã®ã³ã³ããã¹ãã§ã¯ãã¹ããªãŒãã³ã°ã¯éçŽã«ãããŸããã
2ã€ã®ã±ãŒã¹ã®ãã¡ãïŒ1ïŒã¯è°è«ã®ããã«ããèå³æ·±ãã®ã§ãããã«æ³šç®ããŸãããã ããã¯ãããããKafkaã§éèšãã©ã®ããã«æ©èœããããæåã«ææ¡ããå¿ èŠãããããšãæå³ããŸãã
ã«ãã«ã®éèš
éçŽã¯ãã¹ããªãŒã åŠçã®äžçš®ã§ãã ä»ã®ã¿ã€ãã«ã¯ãããšãã°ããã£ã«ã¿ãªã³ã°[filters]ããã³çµå[joins]ãå«ãŸããŸãã
åã«ããã£ãããã«ãKafkaã®ããŒã¿ã¯ããŒãšå€ã®ãã¢ãšããŠè¡šç€ºãããŸãã ããã«ãKafkaã®éèšã®æåã®ç¹æ§ã¯ããããããã¹ãŠkeyã«ãã£ãŠèšç®ãããããšã§ãã ãã®ããã
groupBy()
Streamsã®éçŽãã§ãŒãºã®åã«
groupBy()
ãŸãã¯
groupByKey()
ä»ããŠ
groupBy()
ã°ã«ãŒã
KStream
ããå¿ èŠããããŸãã åãçç±ã§ãäžèšã®Scalaã®äŸã§ã¯
groupBy()
ã䜿çšãã
groupBy()
ãŸããã
ããŒãã£ã·ã§ã³åãšã¡ãã»ãŒãžããŒïŒ Kafkaã®çããéèŠãªåŽé¢ã¯ããã®èšäºã§ã¯ç¡èŠããŸããããããã¯ãã¹ããªãŒã ãããã³ããŒãã«ãããŒãã£ã·ã§ã³åãããããšã§ãã å®éãããŒã¿ã¯ããŒãã£ã·ã§ã³ããšã«ããŒããšã«åŠçããã³éçŽãããŸãã ããã©ã«ãã§ã¯ãã¡ãã»ãŒãž/ã¬ã³ãŒãã¯ããŒã«åºã¥ããŠããŒãã£ã·ã§ã³éã§åæ£ãããŸãããããã£ãŠãå®éã«ã¯ãæè¡çã«è€éã§ããæ£ç¢ºãªãããŒããŒãã£ã·ã§ã³åå²ãã§ã¯ãªãããããŒéçŽããç°¡çŽ åããããšã¯å®å šã«åãå ¥ããããŸãã ãã ããã«ã¹ã¿ã ããŒãã£ã·ã§ãã³ã°å²ãåœãŠã¢ã«ãŽãªãºã ã䜿çšããå Žåã¯ãåŠçããžãã¯ã§ãããèæ ®ããå¿ èŠããããŸãã
Kafkaã®éèšã®2çªç®ã®ç¹æ§ã¯ãæ°ããããŒã¿ãçä¿¡ã¹ããªãŒã ã«å°çãããšãéèšãç¶ç¶çã«æŽæ°ãããããšã§ãã ããŒã«ããèšç®ã®ããããã£ãšå ±ã«ãããã«ã¯ããŒãã«ãå¿ èŠã§ããããæ£ç¢ºã«ã¯ãçµæãšããŠããããã£ãŠè¿ãããéèšã®ã¿ã€ããå€æŽå¯èœãªããŒãã«ãå¿ èŠã§ãã ããŒã®ä»¥åã®å€ïŒéèšçµæïŒã¯ãåžžã«æ°ããå€ã§äžæžããããŸãã Kafka StreamsãšKSQLã®äž¡æ¹ã§ãéèšã¯åžžã«ããŒãã«ãè¿ããŸãã
2çªç®ã®äŸã«æ»ããã¹ããªãŒã å ã®åãŠãŒã¶ãŒã蚪ããå Žæã®æ°ãèšç®ããŸãã
ã«ãŠã³ãã¯éèšã®äžçš®ã§ãã å€ãèšç®ããã«ã¯ãåã®ã»ã¯ã·ã§ã³ã®éèšã¹ããŒãžã
.reduce((aggV, newV) -> newV)
ãã
.map { case (k, v) => (k, v.length) }
ã§ãã æ»ãå€ã®åã¯ããŒãã«/ãããã§ããããšã«æ³šæããŠãã ããïŒScalaã§ã¯ããã©ã«ãã§äžå€
map
䜿çšãããããScalaã³ãŒãã§ã¯
map
äžå€[äžå€ããã]ã§ãããšããäºå®ãç¡èŠããŠãã ããïŒã
// Scala analogy scala> val visitedLocationsPerUser = stream | .groupBy(_._1) | .map { case (k, v) => (k, v.length) } // => visitedLocationsPerUser: scala.collection.immutable.Map[String,Int] = // Map(alice -> 3, bob -> 2)
äžèšã®Scalaã®äŸã«çžåœããKafka Streamsã³ãŒãïŒ
KTable<String, Long> visitedLocationsPerUser = stream .groupByKey() .count();
KSQLã®å ŽåïŒ
CREATE TABLE visitedLocationsPerUser AS SELECT username, COUNT(*) FROM myStream GROUP BY username;
ããŒãã«-éçŽãããã¹ããªãŒã ïŒå ¥åã¹ããªãŒã âããŒãã«ïŒ
äžã§èŠãããã«ãããŒãã«ã¯å ¥åã¹ããªãŒã ã®éçŽã§ãããèŠããã«ãããŒãã«ã¯éçŽã¹ããªãŒã ã§ãã Kafka StreamsãŸãã¯KSQLã§éèšãå®è¡ãããšãçµæã¯åžžã«ããŒãã«ã«ãªããŸãã
éçŽæ®µéã®ç¹æ§ã«ãããç¶æ ãªãã§UPSERTã»ãã³ãã£ã¯ã¹ãä»ããŠã¹ããªãŒã ããããŒãã«ãçŽæ¥ååŸãããã©ããã決å®ãããŸãïŒããŒãã«ã¯ãã¹ããªãŒã ã®æåŸã®å€ã«ããŒã衚瀺ããŸããããã¯ãKafkaãããã¯ãçŽæ¥ããŒãã«ã«èªã¿èŸŒããšãã«éçŽãããŸãïŒã ã¹ããŒããã«ã«ãŠã³ã ïŒæåŸã®äŸãåç §ïŒããŸãã¯å ç®ãå¹³ååãªã©ã®ããè€éãªéèšã Kafka Streamsããã³KSQLã䜿çšããå Žåãã¿ã³ããªã³ã°ãŠã£ã³ããŠããããã³ã°ãŠã£ã³ããŠãããã³ã»ãã·ã§ã³ãŠã£ã³ããŠã䜿çšãããŠã£ã³ããŠåãããéèšãå«ããéèšã®å€ãã®ãªãã·ã§ã³ããããŸãã
ããŒãã«ã«å€æŽã¹ããªãŒã ããããŸãïŒããŒãã«âåºåã¹ããªãŒã ïŒ
ããŒãã«ã¯å ¥åã¹ããªãŒã ã®éåäœã§ãããç¬èªã®åºåã¹ããªãŒã ããããŸãïŒ ããŒã¿ããŒã¹ã«å€æŽããŒã¿ïŒCDCïŒãèšé²ããããã«ãKafkaã®ããŒãã«ã®ãã¹ãŠã®å€æŽã¯ãããŒãã«ã®changelogã¹ããªãŒã ãšåŒã°ããå éšå€æŽã¹ããªãŒã ã«èšé²ãããŸãã Kafka Streamsããã³KSQLã§ã®èšç®ã®å€ãã¯ãå®éã«ã¯changelogã¹ããªãŒã ã§è¡ãããŸã ã ããã«ãããããšãã°ãKafka Streamsããã³KSQLã¯ã ã€ãã³ãæåŠçã®ã»ãã³ãã£ã¯ã¹ã®ã»ãã³ãã£ã¯ã¹ã«åŸã£ãŠå±¥æŽããŒã¿ãæ£ããåŠçã§ããŸããã¹ããªãŒã ã¯çŸåšãšéå»ã®äž¡æ¹ãè¡šããŸãããè¡šã¯çŸåšã®ã¿ãè¡šãããšãã§ããããšã«æ³šæããŠãã ããæ£ç¢ºã«ãåºå®ãããæé[ã¹ãããã·ã§ããin time] ïŒã
æ³šïŒ Kafka Streamsã§ã¯ãKTable#toStream()
䜿çšããŠãæ瀺çã«ããŒãã«ãå€æŽã¹ããªãŒã [changelog stream]ã«å€æã§ããŸãã
æåã®äŸã次ã«ç€ºããŸãããå€æŽãã°ã¹ããªãŒã ã䜿çšããŸã ã
ããŒãã«ã®changelogã¹ããªãŒã ã¯ããã®ããŒãã«ã®å ¥åã¹ããªãŒã ã®ã³ããŒã§ããããšã«æ³šæããŠãã ããã ããã¯ã察å¿ããéçŽé¢æ°ïŒUPSERTïŒã®æ§è³ªã«ãããã®ã§ãã ãããã¯ãã£ã¹ã¯ã¹ããŒã¹ãæ¶è²»ãã1察1ã®ã³ããŒã§ã¯ãããŸãããïŒã-Kafka StreamsãšKSQLã¯ãäžèŠãªããŒã¿ã³ããŒãšããŒã«ã«/ãããã¯ãŒã¯IOãæå°éã«æããæé©åãå®è¡ããŸãã åºæ¬çã«äœãèµ·ãã£ãŠãããããããã説æããããã«ãäžã®å³ã®ãããã®æé©åãç¡èŠããŸãã
ãããŠæåŸã«ã changelog streamãå«ã2çªç®ã®ãŠãŒã¹ã±ãŒã¹ã ããã§ã¯ã ããŒããšã®ã«ãŠã³ããå®è¡ããå¥ã®éèšé¢æ°ããããããããŒãã«ã®å€æŽã®ã¹ããªãŒã ã¯ç°ãªããŸãã
ãã ãããããã®å éšå€æŽãã°ã¹ããªãŒã ã¯ãã¢ãŒããã¯ãã£ããã³éçšäžã®åœ±é¿ãåããŸãã å€æŽã¹ããªãŒã ã¯ç¶ç¶çã«ããã¯ã¢ãããããKafkaã®ãããã¯ãšããŠä¿åãããŸããããŒãèªäœã¯ãKafka Streamsããã³KSQLã®åŒŸåæ§ãšåŸ©å åãæäŸããéæ³ã®äžéšã§ãã ããã¯ã [stateful]ç¶æ ãŸãã¯[stateless]ãªãã®åŠçã«é¢ä¿ãªããããŒã¿ã倱ãããšãªãããã¹ãŠã®æäœãéããŠããã·ã³/ä»®æ³ãã·ã³/ã³ã³ããéã§åŠçã¿ã¹ã¯ã移åã§ããããã§ãã ããŒãã«ã¯ã¢ããªã±ãŒã·ã§ã³ïŒ[Kafka Streams]ïŒãŸãã¯ã¯ãšãªïŒKSQLïŒã®[ç¶æ ]ç¶æ ã®äžéšã§ãããããKafkaã¯åŠçã³ãŒãïŒç°¡åïŒã ãã§ãªããããŒãã«ãå«ãåŠçã¹ããŒã¿ã¹ããã·ã³éã§é«éãã€ä¿¡é Œæ§ã®é«ãæ¹æ³ã§è»¢éããå¿ èŠããããŸãïŒã¯ããã«è€éã§ãïŒã ããŒãã«ãã¯ã©ã€ã¢ã³ããã·ã³Aãããã·ã³Bã«ç§»åããå¿ èŠããããšãã¯ãã€ã§ããæ°ããå®å Bã§ãããŒãã«ã¯ãã·ã³AãšãŸã£ããåãç¶æ ã®å€æŽãã°ã¹ããªãŒã ããKafkaïŒãµãŒããŒåŽïŒã«åæ§ç¯ãããŸããäžèšã®æåŸã®å³ã§ã¯ãå ¥åã¹ããªãŒã ãåŠçããããšãªããå€æŽãã°ã¹ããªãŒã ããã ã«ãŠã³ãããŒãã«ããç°¡åã«ååŸã§ããŸãã
å察ã¹ããªãŒã è¡š
ã¹ããªãŒã ããŒãã«ã®äºéæ§ãšããçšèªã¯ãã¹ããªãŒã ãšããŒãã«ã®éã®äžèšã®é¢ä¿ãæããŸãã ããã¯ãããšãã°ãã¹ããªãŒã ãããŒãã«ã«ããã®ããŒãã«ãå¥ã®ã¹ããªãŒã ã«ãçµæã®ã¹ããªãŒã ãå¥ã®ããŒãã«ã«ããšããããã«å€æã§ããããšãæå³ããŸãã 詳现ã«ã€ããŠã¯ãConfluentïŒ Introducing Kafka StreamsïŒStream Processing Made Simpleããã°æçš¿ãåç §ããŠãã ããã
ããŒã¿ããŒã¹ã®è£è¿ã
åã®ã»ã¯ã·ã§ã³ã§èª¬æããå 容ã«å ããŠãããŒã¿ããŒã¹ã®è£è¿ãã«é¢ããèšäºãã芧ã«ãªã£ãæ¹ãããã£ããããããããŸãããããã®å šäœãèŠãŠã¿ãããšæããããããŸããã ããã§ã¯è©³ãã説æããŸããã®ã§ãKafkaãšã¹ããªãŒãã³ã°åŠçã®äžçãããŒã¿ããŒã¹ã®äžçãšç°¡åã«æ¯èŒããŠã¿ãŸãããã èŠæããŠãã ããïŒä»¥éãæ·±å»ãªåçŽå[çœé»ã®åçŽå] ã
ããŒã¿ããŒã¹ã§ã¯ã ããŒãã«ã¯1次æ§é ã§ãã ãããããªãã®ä»äºã§ãã ãã¹ããªãŒã ãã¯ãããšãã°MySQLã®binlogãOracleã®GoldenGateã®ãããªããŒã¿ããŒã¹ã«ãååšããŸãããéåžžã¯ããããšçŽæ¥ããåãã§ããªããšããæå³ã§ããªãããé ãããŠããŸãã ããŒã¿ããŒã¹ã¯çŸåšã«ã€ããŠã¯ç¥ã£ãŠããŸãããéå»ã«ã€ããŠã¯ç¥ããŸããïŒéå»ãå¿ èŠãªå Žåã¯ã ããã¯ã¢ããããŒãããããŒã¿ã埩å ããŸããããã¯åãªãããŒããŠã§ã¢ã¹ããªãŒã ã§ãïŒã
Kafkaããã³ã¹ããªãŒã åŠçã§ã¯ã ã¹ããªãŒã ã¯1次æ§é ã§ãã åã«èŠãããã«ãããŒãã«ã¯ã¹ããªãŒã ã®æŽŸçç©ã§ã ã ã¹ããªãŒã ã¯çŸåšãšéå»ãç¥ã£ãŠããŸãã äŸãšããŠã New York Timesã¯ã1850幎代ãã160幎ã«ããããžã£ãŒããªãºã ã®ãã¹ãŠã®å ¬éãããèšäºã ãä¿¡é Œã§ããããŒã¿ã®ãœãŒã¹ã§ããã«ãã«ã«ä¿ç®¡ããŠããŸãã
èŠããã«ãããŒã¿ããŒã¹ã¯æåã«ããŒãã«ãšããŠã次ã«ã¹ããªãŒã ãšããŠèããŸãã ã«ãã«ã¯æåã«ã¹ããªãŒã ã§èãã次ã«ããŒãã«ã§èããŸãã , Kafka- , , â , WordCount, , . , Kafka , Kafka Streams KSQL, ( ). Kafka , , - [stream-relational] , [stream-only].
, â . Kafka , â .
ãããã«
, , Kafka . , , « » « Kafka ».
- Kafka, Kafka Streams KSQL, :
- , KSQL , SQL- Kafka, Kafka - . , , Kafka , . KSQL clickstream ( Docker), Kafka, KSQL, Elasticsearch Grafana real-time dashboard .
- , Java Scala Kafka Streams API .
- , , , , KSQL, Kafka Streams, KSQL.
, Kafka Streams KSQL, Kafka , , (, , , , , , ). , . :-)
, « , 1». , , . ? !
â , , .