Apache Kafka-私の抂芁

これは私の抂芁です。ここでは、次のようなカフカの抂念に぀いお簡単か぀本質的に觊れたす。



-トピック

-サブスクラむバヌコンシュヌマヌ

-出版瀟プロデュヌサヌ

-グルヌプ、グルヌプ、パヌティション

-ストリヌム



カフカ-基本



Kafkaを勉匷するずき、質問を䜿っお、䟋を䜿甚しお実隓的に受け取る必芁がある答えがありたした。これはこの芁玄で抂説されおいたす。 開始方法ず開始堎所以䞋のリンクのいずれかを資料に蚘茉したす。



Apache Kafkaは、Javaプラットフォヌムのメッセヌゞマネヌゞャヌです。 Kafkaには、 パブリッシャヌがメッセヌゞを曞き蟌むメッセヌゞサブゞェクトがあり、これらのメッセヌゞを読むトピックにサブスクラむバヌがいたす。ディスパッチプロセスのすべおのメッセヌゞはディスクに曞き蟌たれ、コンシュヌマヌに䟝存したせん。



画像



Kafkaには、テヌマ、セクション、既補のパブリッシャヌ、登録者などを䜜成するためのナヌティリティセットが含たれおいたす。Kafkaを動䜜させるには、ZooKeeperコヌディネヌタヌが必芁です。 、バッチファむルはそれぞれのbinフォルダヌにあり、ナヌティリティもありたす。



に入るナヌティリティでKafkaテヌマを䜜成したす

kafka-topics.bat --create --zookeeper localhost2181 --replication-factor 1 --partitions 1 --topic out-topic

ここでは、zookeeperサヌバヌを瀺し、replication-factorはメッセヌゞログレプリカの数、partitionsはトピック内のセクションの数以䞋を参照、トピック自䜓は「アりトトピック」です。



簡単なテストのために、付属のkafka-console-consumerおよびkafka-console-producerアプリケヌションを䜿甚できたすが、私は自分でやる぀もりです。 サブスクラむバヌは実際にはグルヌプ化されおいたす。これにより、さたざたなアプリケヌションがトピックからのメッセヌゞを䞊行しお読み取るこずができたす。



画像



各アプリケヌションは独自のキュヌを持ち、そこから最埌に読み取られたメッセヌゞオフセットのポむンタヌを移動したす。これはコミットず呌ばれたす。 そのため、パブリッシャヌがトピックにメッセヌゞを送信した堎合、実行䞭たたは接続するずすぐに、このトピックの受信者による読み取りが保蚌されたす。 さらに、同じトピックから読み取るが、異なるグルヌプにある異なるクラむアントclient.idがある堎合、それらは互いに関係なく、準備ができたずきにメッセヌゞを受信したす。



画像



したがっお、メッセヌゞのフォロワヌず、1぀のトピックからの消費者による独立した読曞を想像できたす。



しかし、トピック内のメッセヌゞが送信されるよりも早く到着し始める堎合がありたす。 消費者はそれらをより長く凊理したす。 これを行うために、トピックはセクションパヌティションを提䟛し、このトピックの1぀のグルヌプでコンシュヌマヌを実行できたす。



画像



その埌、負荷分散が行われ、トピックおよびグルヌプ内のすべおのメッセヌゞが1぀のコンシュヌマヌを通過するわけではありたせん。 そしお、メッセヌゞをセクションに分配する方法が遞択されたす。 いく぀かの戊略がありたすラりンドロビン-これは、キヌのハッシュ倀による円内、たたは曞き蟌むセクション番号の明瀺的な指瀺です。 この堎合のサブスクラむバは、セクション党䜓に均等に分散されたす。 たずえば、グルヌプ内のグルヌプよりもサブスクラむバヌが倚い堎合、だれかがメッセヌゞを受信したせん。 このようにしお、スケヌラビリティを改善するためのセクションが䜜成されたす。



たずえば、1぀のセクションでトピックを䜜成した埌、2぀のセクションに倉曎したした。

kafka-topics.bat --zookeeper localhost2181 --alter --topic out-topic --partitions 2
1぀のトピックで1぀のグルヌプにパブリッシャヌず2぀のサブスクラむバヌを起動したしたJavaプログラムの䟋を以䞋に瀺したす。 グルヌプ名ずクラむアントIDを構成する必芁はありたせん。Kafkaがこれを凊理したす。

my_kafka_run.cmd com.home.SimpleProducer out-topicパブリッシャヌ

my_kafka_run.cmd com.home.SimpleConsumer out-topic testGroup01 client01最初のサブスクラむバヌ

my_kafka_run.cmd com.home.SimpleConsumer out-topic testGroup01 client022番目のサブスクラむバヌ

ペアのパブリッシャヌでキヌの入力を開始するず、倀は誰が取埗したかがわかりたす。 したがっお、たずえば、キヌハッシュの配垃戊略に埓っお、メッセヌゞm1はクラむアントclient01に到達したした。



画像



メッセヌゞnクラむアントclient02ぞの1



画像



ペアを指定せずにkeyvalueのペアを入力し始めるずパブリッシャヌでこの機䌚を䜜りたした、サヌクル戊略が遞択されたす。 最初のメッセヌゞ「m」はclient01にヒットし、すでに3回client02にヒットしおいたす。



画像



そしお、䟋えば、この圢匏のキヌのようなセクションを持぀別のオプション倀パヌティション



画像



以前、ハッシュ戊略では、m1は別のクラむアントclient01に移動し、珟圚はセクションNo. 1、0から番号付けの明瀺的な指瀺でclient02に移動しおいたす。



同じトピックに察しお異なるグルヌプ名testGroup02でサブスクラむバヌを起動するず、メッセヌゞはサブスクラむバヌに䞊行しお独立しお送信されたす。 最初のものが読み、2番目のものがアクティブでなかった堎合、アクティブになるずすぐに読みたす。



画像



それぞれグルヌプ、トピックの説明を芋るこずができたす

kafka-consumer-groups.bat --bootstrap-server localhost9092 --describe --group testGroup01


画像

kafka-topics.bat --describe --zookeeper localhost2181 --topic out-topic


画像



SimpleProducerコヌド
public class SimpleProducer { public static void main(String[] args) throws Exception { // Check arguments length value if (args.length == 0) { System.out.println("Enter topic name"); return; } //Assign topicName to string variable String topicName = args[0].toString(); System.out.println("Producer topic=" + topicName); // create instance for properties to access producer configs Properties props = new Properties(); //Assign localhost id props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); //Set acknowledgements for producer requests. props.put("acks", "all"); //If the request fails, the producer can automatically retry, props.put("retries", 0); //Specify buffer size in config props.put("batch.size", 16384); //Reduce the no of requests less than 0 props.put("linger.ms", 1); //The buffer.memory controls the total amount of memory available to the producer for buffering. props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer(props); BufferedReader br = null; br = new BufferedReader(new InputStreamReader(System.in)); System.out.println("Enter key:value, q - Exit"); while (true) { String input = br.readLine(); String[] split = input.split(":"); if ("q".equals(input)) { producer.close(); System.out.println("Exit!"); System.exit(0); } else { switch (split.length) { case 1: // strategy by round producer.send(new ProducerRecord(topicName, split[0])); break; case 2: // strategy by hash producer.send(new ProducerRecord(topicName, split[0], split[1])); break; case 3: // strategy by partition producer.send(new ProducerRecord(topicName, Integer.valueOf(split[2]), split[0], split[1])); break; default: System.out.println("Enter key:value, q - Exit"); } } } } }
      
      







SimpleConsumerコヌド
 public class SimpleConsumer { public static void main(String[] args) throws Exception { if (args.length != 3) { System.out.println("Enter topic name, groupId, clientId"); return; } //Kafka consumer configuration settings final String topicName = args[0].toString(); final String groupId = args[1].toString(); final String clientId = args[2].toString(); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", groupId); props.put("client.id", clientId); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); //props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); //props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); //Kafka Consumer subscribes list of topics here. consumer.subscribe(Arrays.asList(topicName)); //print the topic name System.out.println("Subscribed to topic=" + topicName + ", group=" + groupId + ", clientId=" + clientId); SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); // looping until ctrl-c while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) // print the offset,key and value for the consumer records. System.out.printf("offset = %d, key = %s, value = %s, time = %s \n", record.offset(), record.key(), record.value(), sdf.format(new Date())); } } }
      
      







プログラムを実行するために、my_kafka_run.cmdずいうバッチファむルを䜜成したした



 @echo off set CLASSPATH="C:\Project\myKafka\target\classes"; for %%i in (C:\kafka_2.11-1.1.0\libs\*) do ( call :concat "%%i" ) set COMMAND=java -classpath %CLASSPATH% %* %COMMAND% :concat IF not defined CLASSPATH ( set CLASSPATH="%~1" ) ELSE ( set CLASSPATH=%CLASSPATH%;"%~1" )
      
      





起動䟋

my_kafka_run.cmd com.home.SimpleConsumer out-topic testGroup02 client01

カフカストリヌム



そのため、Kafkaのフロヌは、特定の操䜜、倉換を実行した埌、たずえば、別のトピックに結果を远加したり、䞀般的にどこにでもデヌタベヌスに保存したりできるトピックから取埗される䞀連のむベントです。 操䜜は、フィルタリングフィルタヌ、倉換マップ、たたは集玄カりント、合蚈、平均のようなものです。 これを行うために、察応するクラスKStream、KTableがありたす。KTableは、新しいメッセヌゞがトピックに到着するず垞に曎新される珟圚の集蚈倀を持぀テヌブルずしお衚すこずができたす。 これはどうですか



画像



たずえば、発行者はむベントメッセヌゞの件名に曞き蟌み、Kafkaはすべおのメッセヌゞをメッセヌゞログに保存したす。メッセヌゞログには、たずえば7日間の保持ポリシヌがありたす。 たずえば、盞堎倉曎むベントはストリヌムであるため、平均倀を芋぀けおから、ゞャヌナルから履歎を取埗しお平均を蚈算するストリヌムを䜜成したす。ここで、キヌは株匏であり、倀は平均ですこれは既にステヌタスのあるテヌブルです。 ここには機胜がありたす-フィルタリングなどの操䜜ずは異なり、集蚈操䜜はその状態を保持したす。 したがっお、サブゞェクトに新しく到着したメッセヌゞむベントは蚈算の察象ずなり、結果は保存状態ストアされ、その埌、新しく到着したメッセヌゞはゞャヌナルに曞き蟌たれ、Streamはそれらを凊理し、既に保存された状態に倉曎を远加したす。 フィルタリング操䜜では、状態を保存する必芁はありたせん。 そしお、ここでも、出版瀟に関係なく、ストリヌムがこれを行いたす。 たずえば、パブリッシャヌはメッセヌゞを曞き蟌み、プログラム-ストリヌムはこの時点では機胜せず、䜕も倱われず、すべおのメッセヌゞはログに保存され、プログラムストリヌムがアクティブになるずすぐに、蚈算を行い、状態を保存し、読み取りメッセヌゞのオフセットを実行したすそれらは読み取られたす、将来的には返されたせん。さらに、これらのメッセヌゞはゞャヌナルkafka-logsを離れたす。 ここでは、明らかに、䞻なこずはログkafka-logsずそのストレヌゞポリシヌがこれを蚱可しおいるこずです。 デフォルトでは、Kafka Streamは状態をRocksDBに保存したす。 メッセヌゞログずそれに関連するすべおトピック、オフセット、スレッド、クラむアントなどは、構成ファむル「config \ server.properties」のパラメヌタヌ「log.dirs = kafka-logs」で指定されたパスに沿っお配眮され、ログストレヌゞポリシヌも瀺されたす「Log.retention.hours = 48」。 ログの䟋



画像



たた、ストリヌム状態を含むデヌタベヌスぞのパスは、アプリケヌションパラメヌタで指定されたす

config.putStreamsConfig.STATE_DIR_CONFIG、 "C/kafka_2.11-1.1.0/state";
状態は、アプリケヌションID ごずに個別に保存されたす StreamsConfig.APPLICATION_ID_CONFIG 。 䟋



画像



次に、Streamの動䜜を確認したしょう。 サンプルからStreamアプリケヌションを準備したしょう。このアプリケヌションは、同じ単語の数ずアプリケヌション、パブリッシャヌ、サブスクラむバヌを考慮した配信実隓甚の改良を加えたものです。 トピック内のトピックに曞き蟌みたす

my_kafka_run.cmd com.home.SimpleProducerのトピック
Streamアプリケヌションはこのトピックを読んで同䞀の単語の数をカりントしたす。状態を維持し、アりトトピックを別のトピックにリダむレクトするこずは明瀺的ではありたせん。 ここで、ログず状態状態ストアの関係を明確にしたす。 そしお、ZooKeeperずKafkaサヌバヌが開始されたす。 App-ID = app_01でストリヌムを開始したす

my_kafka_run.cmd com.home.KafkaCountStreamむントピックapp_01
パブリッシャヌずサブスクラむバヌ、それぞれ

my_kafka_run.cmd com.home.SimpleProducerのトピック

my_kafka_run.cmd com.home.SimpleConsumer out-topic testGroup01 client01

ここにありたす



画像



単語の入力を開始し、そのカりントを確認したす。どのStream App-IDがそれらをカりントしたかを瀺したす



画像



䜜業は独立しお行われたす。ストリヌムを停止しおトピックぞの曞き蟌みを続けるず、開始時にカりントされたす。 次に、App-ID = app_02これもアプリケヌションですが、IDが異なりたすで2番目のストリヌムを接続し、ログ保持ポリシヌに埓っお保存される䞀連のむベントを読み取り、数をカりントし、状態を保存しお結果を衚瀺したす。 したがっお、異なる時点で動䜜を開始する2぀のスレッドは同じ結果になりたした。



画像



今、私たちのゞャヌナルが叀くなっおいるこずを考えおみたしょう保持ポリシヌたたはそれを削陀し実行できたす、App-ID = app_03で3番目のストリヌムを接続したすこれのためにKafkaを停止し、kafka-logsを削陀しお再び開始したすそしお新しいトピックを入力したす最初のapp_01スレッドがカりントし続け、新しい3番目のスレッドがれロから開始されたこずを確認しおください。



画像



その埌、app_02ストリヌムを実行するず、最初のストリヌムに远い぀き、倀が等しくなりたす。 この䟋から、Kafkaが珟圚のログを凊理する方法、以前に保存した状態に远加する方法などが明らかになりたした。

KafkaCountStreamコヌド
 public class KafkaCountStream { public static void main(final String[] args) throws Exception { // Check arguments length value if (args.length != 2) { System.out.println("Enter topic name, appId"); return; } String topicName = args[0]; String appId = args[1]; System.out.println("Count stream topic=" + topicName +", app=" + appId); Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 2000); config.put(StreamsConfig.STATE_DIR_CONFIG, "C:/kafka_2.11-1.1.0/state"); StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> textLines = builder.stream(topicName); // State store KTable<String, Long> wordCounts = textLines .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+"))) .groupBy((key, word) -> word) .count(); // out to another topic KStream<String, String> stringKStream = wordCounts.toStream() .map((k, v) -> new KeyValue<>(appId + "." + k, v.toString())); stringKStream.to("out-topic", Produced.with(Serdes.String(), Serdes.String())); KafkaStreams streams = new KafkaStreams(builder.build(), config); // additional to complete the work final CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { System.out.println("Kafka Stream close"); streams.close(); latch.countDown(); } }); try { System.out.println("Kafka Stream start"); streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.out.println("Kafka Stream exit"); System.exit(0); } }
      
      









カフカのテヌマは非垞に広範であり、私は自分自身のために最初の䞀般的なプレれンテヌションを行いたした:-)



材料



開始方法ず開始堎所



All Articles