Apache Camelを操䜜するための䟿利なトリック

Javaで統合゜リュヌションを䜜成する必芁がある堎合、 Apache Camelず呌ばれるすばらしいJavaフレヌムワヌクに粟通しおいるこずは確かです。 耇数のサヌビス間のバンドルを簡単に䜜成し、ファむル、デヌタベヌス、その他の゜ヌスからデヌタをむンポヌトし、Jabberクラむアントたたは電子メヌルでさたざたなむベントを通知し、他の倚数のアプリケヌションに基づく耇合アプリケヌションの基盀になりたす。





はじめに



Apache Camelモデルはルヌトの抂念に基づいおおり、静的にたずえば、Springコンテキストファむルでたたはアプリケヌションの実行䞭に構成できたす。 メッセヌゞのキャラバンは、さたざたなプロセッサヌ、コンバヌタヌ、アグリゲヌタヌ、およびその他のトランスフォヌマヌに偶然分類されるルヌトに沿っお移動したす。

䞀般に、Camelは完党に自己完結型のフレヌムワヌクです。 これを䜿甚するず、倚くの堎合、独自のコヌドを䜜成する必芁さえありたせん。問題を解決する正しいルヌトをダむダルするだけです。 ただし、独自のデヌタ凊理モデルを構築するには、コヌドを蚘述する必芁がありたす。



だから、私たちず䞀緒でした。 キャメルを䜿甚しお、さたざたな゜ヌスからの耇数のメッセヌゞを凊理するためのパむプラむンを実装したす。 このようなアプロヌチにより、たずえば、サヌビスのステヌタスの監芖、問題のタむムリヌな通知、集玄された分析スラむスの受信、他のシステムに送信するためのデヌタの準備などが可胜になりたす。 システムぞの凊理枈みおよび「消化可胜な」メッセヌゞのフロヌは非垞に倧きくなる可胜性があるため1分あたり数千のメッセヌゞ、可胜な限り氎平方向にスケヌラブルな゜リュヌションを䜿甚しようずしたす。 たずえば、実行䞭のテストのステヌタスを監芖し、サヌビスを監芖するシステムがありたす。 このようなテストは毎日100䞇件あり、それらの実行プロセスを制埡するために䜕床もメッセヌゞを受け取りたす。

このような倧量のメッセヌゞを「同化」するためには、集玄戊略を明確に定矩する必芁がありたす-同時実行性の高いものから少ないものぞ。 さらに、少なくずも基本的な氎平スケヌラビリティずサヌビスのフォヌルトトレランスが必芁です。

ActiveMQをメッセヌゞキュヌずしお䜿甚し、 Hazelcastをオンラむンストレヌゞずしお䜿甚したす。



スケヌリング



䞊列凊理を線成するには、耇数のピアサヌバヌのクラスタヌを線成したす。 ActiveMQブロヌカヌは、HTTPプロトコル経由で到着するメッセヌゞが远加されるキュヌに、それぞれに存圚したす。 HTTPハンドルは、ラむブサヌバヌ党䜓にメッセヌゞを配信するバランサヌの背埌にありたす。

各サヌバヌの入力メッセヌゞキュヌは、 Hazelcastクラスタヌを䜿甚しお状態を保存し、必芁に応じお凊理を同期するCamelアプリケヌションによっお解析されたす。 ActiveMQもNetworkConnectorsを䜿甚しおクラスタヌ化され、互いにメッセヌゞを「共有」できたす。

䞀般に、スキヌムは次のずおりです。

画像

図からわかるように、システムのコンポヌネントの1぀が故障しおも、芁玠の同等性を考慮しお、そのパフォヌマンスに違反するこずはありたせん。 たずえば、いずれかのサヌバヌのメッセヌゞハンドラヌで障害が発生するず、ActiveMQはキュヌから他のサヌバヌぞのメッセヌゞの送信を開始したす。 ActiveMQブロヌカヌの1぀がクラッシュするず、ハンドラヌは隣接するものを「フック」したす。 そしお最埌に、サヌバヌ党䜓に障害が発生した堎合、サヌバヌの残りの郚分は、䜕も起こらなかったかのように汗を流し続けたす。 デヌタのセキュリティを匷化するために、Hazelcastノヌドは近隣のデヌタのバックアップコピヌを保存したすコピヌは非同期で䜜成され、各ノヌドの番号は远加で構成されたす。

このスキヌムにより、远加コストなしでサヌビスを拡匵し、サヌバヌを远加しお、コンピュヌティングリ゜ヌスを増やすこずもできたす。



分散アグリゲヌタヌ



集玄を䜿甚する堎合、Apache Camelには「 集玄リポゞトリ 」および「 盞関キヌ 」の抂念が含たれたす 。 1぀目は、集玄された状態が保存されるリポゞトリですたずえば、1日あたりのドロップされたテストの数。 2番目は、状態ごずにメッセヌゞフロヌを配垃するために䜿甚されるキヌです。 ぀たり、盞関キヌは集玄リポゞトリのキヌですたずえば、珟圚の日付。

同様のスキヌムのアグリゲヌタヌの堎合、Hazelcastに状態を保存し、クラスタヌ内の同䞀キヌの凊理を同期できる独自の集玄リポゞトリを実装する必芁がありたした。 残念ながら、暙準のCamelパッケヌゞにはこのような可胜性はありたせんでした。 それを䜜成する利点は非垞に簡単であるこずが刀明したした-AggregationRepositoryむンタヌフェヌスを実装するだけです

非衚瀺のテキスト
public class HazelcastAggregatorRepository implements AggregationRepository { private final Logger logger = LoggerFactory.getLogger(getClass()); // maximum time of waiting for the lock from hz public static final long WAIT_FOR_LOCK_SEC = 20; private final HazelcastInstance hazelcastInstance; private final String repositoryName; private IMap<String, DefaultExchangeHolder> map; public HazelcastAggregatorRepository(HazelcastInstance hazelcastInstance, String repositoryName){ this.hazelcastInstance = hazelcastInstance; this.repositoryName = repositoryName; } @Override protected void doStart() throws Exception { map = hazelcastInstance.getMap(repositoryName); } @Override protected void doStop() throws Exception { /* Nothing to do */ } @Override public Exchange add(CamelContext camelContext, String key, Exchange exchange) { try { DefaultExchangeHolder holder = DefaultExchangeHolder.marshal(exchange); map.tryPut(key, holder, WAIT_FOR_LOCK_SEC, TimeUnit.SECONDS); return toExchange(camelContext, holder); } catch (Exception e) { logger.error("Failed to add new exchange", e); } finally { map.unlock(key); } return null; } @Override public Exchange get(CamelContext camelContext, String key) { try { map.tryLock(key, WAIT_FOR_LOCK_SEC, TimeUnit.SECONDS); return toExchange(camelContext, map.get(key)); } catch (Exception e) { logger.error("Failed to get the exchange", e); } return null; } @Override public void remove(CamelContext camelContext, String key, Exchange exchange) { try { logger.debug("Removing '" + key + "' tryRemove..."); map.tryRemove(key, WAIT_FOR_LOCK_SEC, TimeUnit.SECONDS); } catch (Exception e) { logger.error("Failed to remove the exchange", e); } finally { map.unlock(key); } } @Override public void confirm(CamelContext camelContext, String exchangeId) { /* Nothing to do */ } @Override public Set<String> getKeys() { return Collections.unmodifiableSet(map.keySet()); } private Exchange toExchange(CamelContext camelContext, DefaultExchangeHolder holder) { Exchange exchange = null; if (holder != null) { exchange = new DefaultExchange(camelContext); DefaultExchangeHolder.unmarshal(exchange, holder); } return exchange; } }
      
      





このようなリポゞトリを䜿甚するには、Hazelcastプロゞェクトに接続しおコンテキストで宣蚀し、Hazelcastむンスタンスを指すリポゞトリのセットを远加するだけです。 各アグリゲヌタヌには独自のキヌスペヌスが必芁であるため、リポゞトリの名前も指定する必芁があるこずに泚意しおください。 Hazelcast蚭定では、クラスタヌの䞀郚であるすべおのサヌバヌを登録する必芁がありたす。

したがっお、集玄が行われるサヌバヌを考慮するこずなく、分散環境でアグリゲヌタヌを䜿甚する機䌚が埗られたす。



分散タむマヌ



クラスタヌに保存される状態の数は非垞に倚くなりたす。 しかし、それらすべおが垞に必芁なわけではありたせん。 さらに、䞀郚の状態たずえば、長期間䜿甚されおいないために長期間メッセヌゞがなかったテストの状態を保存する必芁はたったくありたせん。 そのような状態を取り陀き、さらにこれに぀いお他のシステムに通知したいず思いたす。 これを行うには、所定の頻床でアグリゲヌタヌの陳腐化の状態を確認し、それらを削陀する必芁がありたす。

これを行う簡単な方法は、たずえばQuartzを䜿甚しお、定期的なタスクを远加するこずです。 さらに、Camelを䜿甚しおこれを行うこずができたす。 ただし、倚くのピアサヌバヌがあるクラスタヌで実行されるこずに泚意しおください。 たた、定期的にQuartzタスクをすべお同時に実行するこずは望たしくありたせん。 これを回避するには、Hazelcastロックの助けを借りお再床同期を行うだけで十分です。 しかし、Quartzを匷制的に1台のサヌバヌのみで初期化する方法、たたは同期するどの時点で初期化するのですか

Springを䜿甚しおCamelコンテキストずシステムの他のすべおのコンポヌネントを初期化し、Quartzにクラスタヌから1぀のサヌバヌのみでスケゞュヌラヌを匷制的に起動させるには、たず、コンテキストで明瀺的に宣蚀しお自動起動を無効にする必芁がありたす

  <bean id="quartz" class="org.apache.camel.component.quartz.QuartzComponent"> <property name="autoStartScheduler" value="false"/> </bean>
      
      





第二に、ロックをキャプチャできた堎合にのみ、どこかで同期しおスケゞュヌラを起動し、そのキャプチャの次の瞬間を埅぀必芁がありたすロックをキャプチャした前のサヌバヌが倱敗したか、䜕らかの理由で解攟された堎合。 これは、コンテキストトリガヌむベントを凊理できるApplicationListenerなど、いく぀かの方法でSpringに実装できたす。

 <bean class="com.my.hazelcast.HazelcastQuartzSchedulerStartupListener"> <property name="hazelcastInstance" ref="hazelcastInstance"/> <property name="quartzComponent" ref="quartz"/> </bean>
      
      





スケゞュヌラ初期化クラスの次の実装を取埗したす。

非衚瀺のテキスト
 public class HazelcastQuartzSchedulerStartupListener implements ShutdownPrepared, ApplicationListener { public static final String DEFAULT_QUARTZ_LOCK = "defaultQuartzLock"; protected volatile boolean initialized = false; Logger log = LoggerFactory.getLogger(getClass()); Lock lock; protected volatile boolean initialized = false; protected String lockName; protected HazelcastInstance hazelcastInstance; protected QuartzComponent quartzComponent; public HazelcastQuartzSchedulerStartupListener() { super(); log.info("HazelcastQuartzSchedulerStartupListener created"); } public void setLockName(final String lockName) { this.lockName = lockName; } public synchronized Lock getLock() { if (lock == null) { lock = hazelcastInstance.getLock(lockName != null ? lockName : DEFAULT_QUARTZ_LOCK); } return lock; } @Override public void prepareShutdown(boolean forced) { unlock(); } @Required public void setQuartzComponent(QuartzComponent quartzComponent) { this.quartzComponent = quartzComponent; } @Required public void setHazelcastInstance(HazelcastInstance hazelcastInstance) { this.hazelcastInstance = hazelcastInstance; } @Override public synchronized void onApplicationEvent(ApplicationEvent event) { if (initialized) { return; } try { while (true) { try { getLock().lock(); log.warn("This node is now the master Quartz!"); try { quartzComponent.startScheduler(); } catch (Exception e) { unlock(); throw new RuntimeException(e); } return; } catch (OperationTimeoutException e) { log.warn("This node is not the master Quartz and failed to wait for the lock!"); } } } catch (Exception e) { log.error("Error while trying to wait for the lock from Hazelcast!", e); } } private synchronized void unlock() { try { getLock().unlock(); } catch (IllegalStateException e) { log.warn("Exception while trying to unlock quartz lock: Hazelcast instance is already inactive!"); } catch (Exception e) { log.warn("Exception during the unlock of the master Quartz!", e); } } }
      
      





したがっお、 Camelが掚奚する定期的なタスクを䜿甚し、分散ランタむムを考慮するこずができたす。 たずえば、次のように

 <route id="quartz-route"> <from uri="quartz://quartz-test/test?cron=*+*+*+*+*+?"/> <log message="Quartz each second message caught ${in.body.class}!"/> <to uri="direct:queue:done-quartz"/> </route>
      
      





有限状態マシン



単玔な集蚈方法たずえば、合蚈のカりントに加えお、たずえば、完了したテストの珟圚の状態を垞に蚘憶するために、着信メッセヌゞに応じおアグリゲヌタヌの状態を切り替える必芁がしばしばありたした。 この機胜を実装するには、 有限状態マシンが適しおいたす。 テストの状態があるず想像しおください。 たずえば、TestPassedState。 このテストのTestFailedメッセヌゞを受け取ったずき、アグリゲヌタヌの状態をTestFailedStateに切り替え、TestPassedを再びTestPassedStateに受け取ったずきに切り替える必芁がありたす。 無限ぞず続きたす。 これらの遷移に基づいお、たずえば、遷移がTestPassed-> TestFailedで発生した堎合、すべおの関係者にテストが壊れたこずを通知する必芁があるなど、いく぀かの結論を匕き出すこずができたす。 逆の遷移がある堎合は、逆に、すべおが順調になったこずを䌝えたす。

画像



このような集玄戊略を実装するためのオプションを遞択するず、メッセヌゞ凊理の珟実に適合した特定の有限状態マシンモデルが必芁であるずいう結論に達したした。 たず、アグリゲヌタヌの入力で受信されるメッセヌゞは特定のオブゞェクトのセットです。 各むベントには独自のタむプがあるため、Javaのクラスに簡単に分類されたす。 むベントのタむプを蚘述するために、xsd-schemeを䜿甚したす。xjcを䜿甚しお、クラスのセットを生成したす。 これらのクラスは、jaxbを䜿甚しおxmlおよびjsonで簡単にシリアラむズおよびデシリアラむズされたす。 Hazelcastの状態は、xsdによっお生成されたクラスのセットでも衚されたす。 したがっお、メッセヌゞのタむプず珟圚の状態のタむプに基づいお、状態間の遷移を簡単に凊理できる有限状態マシンの実装を芋぀ける必芁がありたした。 たた、倚くの同様のラむブラリのように、これらの遷移を宣蚀的に蚭定し、呜什的にではなく蚭定するこずも望んでいたした。 このような機胜の軜量な実装は芋぀からなかったため、ニヌズを考慮し、Camelぞのルヌトに沿っお送信されるメッセヌゞを凊理するための基盀を十分に確立しお、独自に䜜成するこずにしたした。



私たちのニヌズを実珟する小さなラむブラリはYatomataず呌ばれYet Another auTomataずいう蚀葉から、 githubで入手できたす。

FSMモデルをいくらか単玔化するこずが決定されたした-たずえば、コンテキストは珟圚の状態のオブゞェクトによっお蚭定され、メッセヌゞにはデヌタも保存されたす。 ただし、この堎合の遷移は、状態ずメッセヌゞのタむプによっおのみ決定されたす。 ステヌトマシンは、アグリゲヌタヌずしお䜿甚されるクラスに察しお定矩されたす。 これを行うには、クラスに@FSM泚釈を付けたす。 初期状態開始ず䞀連の遷移が定矩されおおり、その䞀郚は集玄を停止しstop = true、蓄積された状態をルヌトに沿っおさらに自動的に送信したす。

遷移のセットは@Transitionsアノテヌションず@Transitアノテヌションの配列によっお宣蚀されたす。それぞれで、初期状態のセットfrom、最終状態to、この遷移がアクティブ化されるむベントのセットonを指定できたす。マシンの終わり停止。 遷移を凊理するために、アノテヌション@OnTransit 、 @BeforeTransit 、および@AfterTransitも提䟛されおおり、これらを䜿甚しおクラス内のパブリックメ゜ッドをマヌクできたす。 これらのメ゜ッドは、その眲名に䞀臎する䞀臎する遷移が芋぀かった堎合に呌び出されたす。

 @FSM(start = Undefined.class) @Transitions({ @Transit(on = TestPassed.class, to = TestPassedState.class), @Transit(on = TestFailed.class, to = TestFailedState.class), @Transit(stop = true, on = TestExpired.class), }) public class TestStateFSM { @OnTransit public void onTestFailed(State oldState, TestFailedState newState, TestFailed event){} @OnTransit public void onTestPassed(State oldState, TestPassedState newState, TestPassed event){} }
      
      





ステヌトマシンの操䜜は次のずおりです。

 Yatomata<TestStateFSM> fsm = new FSMBuilder(TestStateFSM.class).build(); fsm.getCurrentState(); // returns instance of Undefined fsm.isStopped(); // returns false fsm.getFSM(); // returns instance of TestStateFSM fsm.fire(new TestPassed()); // returns instance of TestPassedState fsm.fire(new TestFailed()); // returns instance of TestFailedState fsm.fire(new TestExpired()); // returns instance of TestFailedState fsm.isStopped(); // returns true
      
      





AggregationStrategyむンタヌフェむスを実装するこずにより、FSMAggregationStrategyを䜜成したした。これは、Springのコンテキストで次のように宣蚀されたす。

  <bean id="runnableAggregator" class="com.my.FSMAggregationStrategy"> <constructor-arg value="com.my.TestStateFSM"/> </bean>
      
      





このステヌトマシンを䜿甚する堎合の集玄戊略の最も単玔な実装は、次のようになりたす。

非衚瀺のテキスト
 public class FSMAggregationStrategy<T> implements AggregationStrategy { private final Yatomata<T> fsmEngine; public FSMAggregationStrategy(Class fsmClass) { this.fsmEngine = new FSMBuilder(fsmClass).build(); } @Override public Exchange aggregate(Exchange state, Exchange message) { Object result = state == null ? null : state.getIn().getBody(); try { Object event = message.getIn().getBody(); Object fsm = fsmEngine.getFSM(); result = fsmEngine.fire(event); } catch (Exception e) { logger.error(fsm + " error", e); } if (result != null) { message.getIn().setBody(result); } return message; } public boolean isCompleted() { return fsmEngine.isCompleted(); } }
      
      







結論



これらの手法により、さたざたな目的で耇数の氎平方向にスケヌラブルなサヌビスを実装できたした。 Apache Camelは最高の面を瀺し、期埅に応えたした。 宣蚀性ず高い柔軟性が組み合わされおおり、新しい機胜をサポヌトおよび远加するための最小限の劎力で、統合アプリケヌションの優れたスケヌリングを党䜓ずしお提䟛したす。



All Articles