Apache Ignite:RAMでの分散コンピューティング





こんにちは、Habr!



新しいApacheソリューションに引き続き関心があります。 5月にはHolden Karauの「High Performance Spark」 (組版の本)、8月にはNii Narhidの「Kafka:The Definitive Guide」 (まだ翻訳中)という本をリリースしたいと思っています。 本日は、Apache Igniteについての簡単な紹介記事を提供し、トピックに対する関心の規模を評価したいと思います。



素敵な読書を!



Apache Igniteは比較的新しいソリューションですが、その人気は急速に高まっています。 Ignite機能によりいくつかのツールに似ているため、データベースエンジンの特定の亜種に起因することは困難です。 このツールの主な目的は、RAMに分散データを保存することと、キーと値の形式で情報を保存することです。 Igniteには、一般的なRDBMS機能、特にSQLクエリとACIDトランザクションのサポートもあります。 しかし、これは、このソリューションがSQLでトランザクションを処理するための典型的なデータベースであることを意味するものではありません。 ここでは外部キーの制限はサポートされておらず、トランザクションはキーと値のレベルでのみ利用できます。 ただし、Apache Igniteは非常に興味深いソリューションのようです。



Apache Igniteは、Spring Bootアプリケーションに組み込まれたホストとして簡単に起動できます。 これを実現する最も簡単な方法は、Spring Data Igniteライブラリを使用することです。 Apache Igniteは、統合されたSpring Dataインターフェイスを使用してApache Ignite SQLグリッドへのアクセスを提供するだけでなく、基本的なCRUD操作をサポートするSpring Data CrudRepository



実装しています。 SQLサポートとACIDパラダイムを使用してディスクストレージにデータの永続性を提供しますが、MySQLデータベースにRAMキャッシュオブジェクトを保存するためのソリューションを開発しました。 提案されたソリューションのアーキテクチャを次の図に示します-ご覧のとおり、非常に単純です。 アプリケーションは、Apache Igniteに配置されたRAMキャッシュにデータを配置します。 Apache Igniteは、非同期バックグラウンドタスク中にこれらの変更をデータベースと自動的に同期します。 このアプリケーションでデータを読み取る方法も、驚くことではありません。 エンティティがキャッシュされていない場合、データベースから読み取られ、将来のためにキャッシュされます。







ここでは、この種のアプリケーションの開発方法について詳しく説明します。 結果はGitHubに投稿されます。 インターネットでさらにいくつかの例を見つけましたが、基本だけがカバーされています。 キャッシュからデータベースにオブジェクトを書き込むようにApache Igniteを構成する方法、および複数のキャッシュを使用してより複雑なマージ要求を作成する方法を示します。 データベースを起動することから始めましょう。



1. MySQLデータベースをセットアップする



MySQLデータベースをローカルで実行するには、もちろんDockerコンテナを使用するのが最善です。 Docker for WindowsのMySQLデータベースは現在192.168.99.100:33306で利用可能です。



  docker run -d --name mysql -e MYSQL_DATABASE=ignite -e MYSQL_USER=ignite -e MYSQL_PASSWORD=ignite123 -e MYSQL_ALLOW_EMPTY_PASSWORD=yes -p 33306:3306 mysql
      
      





次に、アプリケーションエンティティがデータを保存するために使用するテーブルPERSON



CONTACT



を作成します。 それらは、テーブルを1 ... Nとして参照しますCONTACT



テーブルには、 PERSON id



指す外部キーが含まれます。



  CREATE TABLE `person` ( `id` int(11) NOT NULL, `first_name` varchar(45) DEFAULT NULL, `last_name` varchar(45) DEFAULT NULL, `gender` varchar(10) DEFAULT NULL, `country` varchar(10) DEFAULT NULL, `city` varchar(20) DEFAULT NULL, `address` varchar(45) DEFAULT NULL, `birth_date` date DEFAULT NULL, PRIMARY KEY (`id`) ); CREATE TABLE `contact` ( `id` int(11) NOT NULL, `location` varchar(45) DEFAULT NULL, `contact_type` varchar(10) DEFAULT NULL, `person_id` int(11) NOT NULL, PRIMARY KEY (`id`) ); ALTER TABLE `ignite`.`contact` ADD INDEX `person_fk_idx` (`person_id` ASC); ALTER TABLE `ignite`.`contact` ADD CONSTRAINT `person_fk` FOREIGN KEY (`person_id`) REFERENCES `ignite`.`person` (`id`) ON DELETE CASCADE ON UPDATE CASCADE;
      
      





2. Mavenを構成する



Apache IgniteのSpring Dataリポジトリーを開始するための最も簡単な方法は、アプリケーションのpom.xml



ファイルに次のMaven依存関係を追加することです。 他のすべてのIgnite依存関係は自動的に含まれます。 データベース接続を構成するには、MySQL JDBCドライバーとSpring JDBC依存関係も必要です。 Apache Igniteをアプリケーションに埋め込み、キャッシュをデータベーステーブルと同期できるようにMySQLデータベースに接続する必要があるため、これらが必要です。



 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.apache.ignite</groupId> <artifactId>ignite-spring-data</artifactId> <version>${ignite.version}</version> </dependency>
      
      





3. Igniteノードを構成する



IgniteConfiguration



クラスを使用すると、Igniteノードで使用可能なすべての設定を構成できます。 この場合、キャッシュ構成(1)が最も重要です。 マスターキーとエンティティクラスをインデックス付きタイプとして追加する必要があります(2)。 次に、キャッシュ更新のデータベースへのエクスポートを提供し(3)、キャッシュにない情報をデータベースから読み取る必要があります(4)。 IgniteノードとMySQL間の相互作用は、 CacheJdbcPojoStoreFactory



(5)クラスを使用して構成できます。 そこで、 DataSource @Bean



(6)、方言(7)、およびオブジェクトのフィールドとテーブルの列の間の対応(8)を渡す必要があります。



 @Bean public Ignite igniteInstance() { IgniteConfiguration cfg = new IgniteConfiguration(); cfg.setIgniteInstanceName("ignite-1"); cfg.setPeerClassLoadingEnabled(true); CacheConfiguration<Long, Contact> ccfg2 = new CacheConfiguration<>("ContactCache"); // (1) ccfg2.setIndexedTypes(Long.class, Contact.class); // (2) ccfg2.setWriteBehindEnabled(true); ccfg2.setWriteThrough(true); // (3) ccfg2.setReadThrough(true); // (4) CacheJdbcPojoStoreFactory<Long, Contact> f2 = new CacheJdbcPojoStoreFactory<>(); // (5) f2.setDataSource(datasource); // (6) f2.setDialect(new MySQLDialect()); // (7) JdbcType jdbcContactType = new JdbcType(); // (8) jdbcContactType.setCacheName("ContactCache"); jdbcContactType.setKeyType(Long.class); jdbcContactType.setValueType(Contact.class); jdbcContactType.setDatabaseTable("contact"); jdbcContactType.setDatabaseSchema("ignite"); jdbcContactType.setKeyFields(new JdbcTypeField(Types.INTEGER, "id", Long.class, "id")); jdbcContactType.setValueFields(new JdbcTypeField(Types.VARCHAR, "contact_type", ContactType.class, "type"), new JdbcTypeField(Types.VARCHAR, "location", String.class, "location"), new JdbcTypeField(Types.INTEGER, "person_id", Long.class, "personId")); f2.setTypes(jdbcContactType); ccfg2.setCacheStoreFactory(f2); CacheConfiguration<Long, Person> ccfg = new CacheConfiguration<>("PersonCache"); ccfg.setIndexedTypes(Long.class, Person.class); ccfg.setWriteBehindEnabled(true); ccfg.setReadThrough(true); ccfg.setWriteThrough(true); CacheJdbcPojoStoreFactory<Long, Person> f = new CacheJdbcPojoStoreFactory<>(); f.setDataSource(datasource); f.setDialect(new MySQLDialect()); JdbcType jdbcType = new JdbcType(); jdbcType.setCacheName("PersonCache"); jdbcType.setKeyType(Long.class); jdbcType.setValueType(Person.class); jdbcType.setDatabaseTable("person"); jdbcType.setDatabaseSchema("ignite"); jdbcType.setKeyFields(new JdbcTypeField(Types.INTEGER, "id", Long.class, "id")); jdbcType.setValueFields(new JdbcTypeField(Types.VARCHAR, "first_name", String.class, "firstName"), new JdbcTypeField(Types.VARCHAR, "last_name", String.class, "lastName"), new JdbcTypeField(Types.VARCHAR, "gender", Gender.class, "gender"), new JdbcTypeField(Types.VARCHAR, "country", String.class, "country"), new JdbcTypeField(Types.VARCHAR, "city", String.class, "city"), new JdbcTypeField(Types.VARCHAR, "address", String.class, "address"), new JdbcTypeField(Types.DATE, "birth_date", Date.class, "birthDate")); f.setTypes(jdbcType); ccfg.setCacheStoreFactory(f); cfg.setCacheConfiguration(ccfg, ccfg2); return Ignition.start(cfg); }
      
      





DockerコンテナとしてのMySQLのSpringデータソース設定は次のとおりです。



spring:

datasource:

name: mysqlds

url: jdbc:mysql://192.168.99.100:33306/ignite?useSSL=false

username: ignite

password: ignite123








ここで、Apache Igniteにはいくつかの欠点がないわけではないことに注意してください。 たとえば、 Enum



を整数にマップし、その序数値を取りますが、VARCHARをJDCBタイプとして構成します。 そのようなシリーズがデータベースから読み取られると、オブジェクトのEnumに誤って表示されnull



。この応答フィールドではnull



になりnull







  new JdbcTypeField(Types.VARCHAR, "contact_type", ContactType.class, "type")
      
      





4.モデルオブジェクト



前述のように、データベーススキーマには2つのテーブルがあります。 また、2つのモデルクラスと2つのキャッシュ構成があり、各モデルクラスに1つずつあります。 以下は、モデルクラスの実装です。 ここで注意すべき最も興味深いことの1つは、 AtomicLong



クラスを使用してIDを生成することです。 これは、シーケンスジェネレーターとして機能するIgniteの基本コンポーネントの1つです。 また、特定のアノテーション@QuerySqlField



も確認@QuerySqlField



ます。 フィールドに付随する場合、このフィールドはSQLでクエリパラメータとして使用できることを意味します。



  @QueryGroupIndex.List( @QueryGroupIndex(name="idx1") ) public class Person implements Serializable { private static final long serialVersionUID = -1271194616130404625L; private static final AtomicLong ID_GEN = new AtomicLong(); @QuerySqlField(index = true) private Long id; @QuerySqlField(index = true) @QuerySqlField.Group(name = "idx1", order = 0) private String firstName; @QuerySqlField(index = true) @QuerySqlField.Group(name = "idx1", order = 1) private String lastName; private Gender gender; private Date birthDate; private String country; private String city; private String address; private List<Contact> contacts = new ArrayList<>(); public void init() { this.id = ID_GEN.incrementAndGet(); } public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getFirstName() { return firstName; } public void setFirstName(String firstName) { this.firstName = firstName; } public String getLastName() { return lastName; } public void setLastName(String lastName) { this.lastName = lastName; } public Gender getGender() { return gender; } public void setGender(Gender gender) { this.gender = gender; } public Date getBirthDate() { return birthDate; } public void setBirthDate(Date birthDate) { this.birthDate = birthDate; } public String getCountry() { return country; } public void setCountry(String country) { this.country = country; } public String getCity() { return city; } public void setCity(String city) { this.city = city; } public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } public List<Contact> getContacts() { return contacts; } public void setContacts(List<Contact> contacts) { this.contacts = contacts; } }
      
      





5.リポジトリに点火する



Spring Data JPAでリポジトリがどのように作成されるか知っていると思います。 リポジトリ処理は、 main



または@Configuration



で提供する必要があります。



 @SpringBootApplication @EnableIgniteRepositories public class IgniteRestApplication { @Autowired DataSource datasource; public static void main(String[] args) { SpringApplication.run(IgniteRestApplication.class, args); } // ... }
      
      





次に、 @Repository



インターフェースを@Repository



ベースインターフェースで拡張します。 id



パラメータを持つ継承されたメソッドのみをサポートします。 PersonRepository



PersonRepository



スニペットPersonRepository



v Spring DataおよびIgniteクエリで採用されている命名規則を使用して、いくつかの検索方法を定義しました。 これらの例は、必要に応じて、クエリ結果で完全なオブジェクトまたはオブジェクトから選択したフィールドを返すことができることを示しています。



 @RepositoryConfig(cacheName = "PersonCache") public interface PersonRepository extends IgniteRepository<Person, Long> { List<Person> findByFirstNameAndLastName(String firstName, String lastName); @Query("SELECT c.* FROM Person p JOIN \"ContactCache\".Contact c ON p.id=c.personId WHERE p.firstName=? and p.lastName=?") List<Contact> selectContacts(String firstName, String lastName); @Query("SELECT p.id, p.firstName, p.lastName, c.id, c.type, c.location FROM Person p JOIN \"ContactCache\".Contact c ON p.id=c.personId WHERE p.firstName=? and p.lastName=?") List<List<?>> selectContacts2(String firstName, String lastName); }
      
      





6. APIとテスト



これで、RESTコントローラクラスにリポジトリコンポーネントを埋め込むことができます。 このAPIは、新しいオブジェクトをキャッシュに追加したり、既存のオブジェクトを更新または削除したり、主キーやその他のより複雑なインデックスで検索したりするためのメソッドを提供します。



 @RestController @RequestMapping("/person") public class PersonController { private static final Logger LOGGER = LoggerFactory.getLogger(PersonController.class); @Autowired PersonRepository repository; @PostMapping public Person add(@RequestBody Person person) { person.init(); return repository.save(person.getId(), person); } @PutMapping public Person update(@RequestBody Person person) { return repository.save(person.getId(), person); } @DeleteMapping("/{id}") public void delete(Long id) { repository.delete(id); } @GetMapping("/{id}") public Person findById(@PathVariable("id") Long id) { return repository.findOne(id); } @GetMapping("/{firstName}/{lastName}") public List<Person> findByName(@PathVariable("firstName") String firstName, @PathVariable("lastName") String lastName) { return repository.findByFirstNameAndLastName(firstName, lastName); } @GetMapping("/contacts/{firstName}/{lastName}") public List<Person> findByNameWithContacts(@PathVariable("firstName") String firstName, @PathVariable("lastName") String lastName) { List<Person> persons = repository.findByFirstNameAndLastName(firstName, lastName); List<Contact> contacts = repository.selectContacts(firstName, lastName); persons.stream().forEach(it -> it.setContacts(contacts.stream().filter(c -> c.getPersonId().equals(it.getId())).collect(Collectors.toList()))); LOGGER.info("PersonController.findByIdWithContacts: {}", contacts); return persons; } @GetMapping("/contacts2/{firstName}/{lastName}") public List<Person> findByNameWithContacts2(@PathVariable("firstName") String firstName, @PathVariable("lastName") String lastName) { List<List<?>> result = repository.selectContacts2(firstName, lastName); List<Person> persons = new ArrayList<>(); for (List<?> l : result) { persons.add(mapPerson(l)); } LOGGER.info("PersonController.findByIdWithContacts: {}", result); return persons; } private Person mapPerson(List<?> l) { Person p = new Person(); Contact c = new Contact(); p.setId((Long) l.get(0)); p.setFirstName((String) l.get(1)); p.setLastName((String) l.get(2)); c.setId((Long) l.get(3)); c.setType((ContactType) l.get(4)); c.setLocation((String) l.get(4)); p.addContact(c); return p; } }
      
      





もちろん、作成されたソリューションのパフォーマンスを確認することは重要です。特に、分散データをRAMおよびデータベースに保存する場合は重要です。 これを行うために、多数のオブジェクトをキャッシュしてから検索メソッドを呼び出すいくつかのjunitテストを作成しました(ランダムデータが入力に使用されます)-これがクエリのパフォーマンスの確認方法です。 多くのPerson



およびContact



オブジェクトを生成し、エンドポイントAPIを使用してキャッシュに配置するメソッドを次に示します。



 @Test public void testAddPerson() throws InterruptedException { ExecutorService es = Executors.newCachedThreadPool(); for (int j = 0; j < 10; j++) { es.execute(() -> { TestRestTemplate restTemplateLocal = new TestRestTemplate(); Random r = new Random(); for (int i = 0; i < 1000000; i++) { Person p = restTemplateLocal.postForObject("http://localhost:8090/person", createTestPerson(), Person.class); int x = r.nextInt(6); for (int k = 0; k < x; k++) { restTemplateLocal.postForObject("http://localhost:8090/contact", createTestContact(p.getId()), Contact.class); } } }); } es.shutdown(); es.awaitTermination(60, TimeUnit.MINUTES); }
      
      





Spring Bootは、APIの応答速度を判断するための基本的な特性を取得するためのメソッドを提供します。 この機能を有効にするには、 Spring Actuator



依存関係を有効にする必要があります。 Metricsエンドポイントは、 localhost :8090 / metricsで利用できます。 各APIメソッドが機能するのにかかる時間を示すだけでなく、アクティブなスレッドの数や空きメモリなどのインジケーターの統計も表示します。



7.アプリケーションの起動



Apache Igniteノードが統合された結果のアプリケーションを実行します。 Igniteのドキュメントに含まれるパフォーマンスのヒントを考慮し、以下に示すJVM構成を決定しました。



  java -jar -Xms512m -Xmx1024m -XX:MaxDirectMemorySize=256m -XX:+DisableExplicitGC -XX:+UseG1GC target/ignite-rest-service-1.0-SNAPSHOT.jar
      
      





これで、 JUnit IgniteRestControllerTest



テストクラスを実行できます。 一定量のデータをキャッシュし、検索メソッドを呼び出します。 パラメーターは、1MのPerson



オブジェクトと2.5MのContact



オブジェクトがキャッシュで使用されるテスト用に提供されます。 各検索方法は、平均で1ミリ秒で実行されます。



  { "mem": 624886, "mem.free": 389701, "processors": 4, "instance.uptime": 2446038, "uptime": 2466661, "systemload.average": -1, "heap.committed": 524288, "heap.init": 524288, "heap.used": 133756, "heap": 1048576, "threads.peak": 107, "threads.daemon": 25, "threads.totalStarted": 565, "threads": 80, ... "gauge.response.person.contacts.firstName.lastName": 1, "gauge.response.contact": 1, "gauge.response.person.firstName.lastName": 1, "gauge.response.contact.location.location": 1, "gauge.response.person.id": 1, "gauge.response.person": 0, "counter.status.200.person.id": 1000, "counter.status.200.person.contacts.firstName.lastName": 1000, "counter.status.200.person.firstName.lastName": 1000, "counter.status.200.contact": 2500806, "counter.status.200.person": 1000000, "counter.status.200.contact.location.location": 1000 }
      
      






All Articles