Javaの世界での仕組み。 スレッドプール

基本的なプログラミングの原則は次のとおりです。車輪を再発明しないでください。 しかし、時々、何が起こっているのか、どのようにツールを誤って使用するのかを理解するために、これを行う必要があります。 今日、マルチスレッドのタスク実行パターンを発明しています。







プロセッサの負荷が大きくなる原因を想像してください。







public class Counter { public Double count(double a) { for (int i = 0; i < 1000000; i++) { a = a + Math.tan(a); } return a; } }
      
      





そのようなタスクをできるだけ早く処理したいので、*を試してください。







 public class SingleThreadClient { public static void main(String[] args) { Counter counter = new Counter(); long start = System.nanoTime(); double value = 0; for (int i = 0; i < 400; i++) { value += counter.count(i); } System.out.println(format("Executed by %ds, value : %f", (System.nanoTime() - start) / (1000_000_000), value)); } }
      
      





4つの物理コアを持つ手押し車で、CPU使用率top-pid {pid}







画像







ランタイム104秒







お気づきのように、1つのスレッドを実行して1つのプロセッサを1つのJavaプロセスにロードすると100%になりますが、ユーザー空間の合計プロセッサー負荷は2.5%に過ぎず、多くの未使用システムリソースがあります







ワークフローを追加して、さらに使用してみましょう。







 public class MultithreadClient { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService threadPool = Executors.newFixedThreadPool(8); Counter counter = new Counter(); long start = System.nanoTime(); List<Future<Double>> futures = new ArrayList<>(); for (int i = 0; i < 400; i++) { final int j = i; futures.add( CompletableFuture.supplyAsync( () -> counter.count(j), threadPool )); } double value = 0; for (Future<Double> future : futures) { value += future.get(); } System.out.println(format("Executed by %ds, value : %f", (System.nanoTime() - start) / (1000_000_000), value)); threadPool.shutdown(); } }
      
      





使用されるリソース:







画像







ThreadPoolExecutor



加速のために、ThreadPoolを使用しました。javaでは、ThreadPoolExecutorがその役割を果たします。これは、直接実装するか、Utilitiesクラスのメソッドの1つから実装できます。 ThreadPoolExecutorの中を見ると、キューを見つけることができます。







 private final BlockingQueue<Runnable> workQueue;
      
      





初期プールのサイズよりも多くのスレッドが実行されている場合にタスクが収集されます。 初期プールサイズのスレッドの数が少ない場合、プールは新しいスレッドを開始しようとします。







 public void execute(Runnable command) { ... if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; ... if (isRunning(c) && workQueue.offer(command)) { ... addWorker(null, false); ... } }
      
      





各addWorkerはRunnableタスクを使用して新しいスレッドを開始します。Runnableタスクは、workQueueをポーリングして新しいタスクを探し、実行します。







 final void runWorker(Worker w) { ... try { while (task != null || (task = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)) != null) { ... task.run(); ... }
      
      





ThreadPoolExecutorには非常に明確なjavadocがあるため、言い換えることは意味がありません。 代わりに、独自に作成してみましょう。







 public class ThreadPool implements Executor { private final Queue<Runnable> workQueue = new ConcurrentLinkedQueue<>(); private volatile boolean isRunning = true; public ThreadPool(int nThreads) { for (int i = 0; i < nThreads; i++) { new Thread(new TaskWorker()).start(); } } @Override public void execute(Runnable command) { if (isRunning) { workQueue.offer(command); } } public void shutdown() { isRunning = false; } private final class TaskWorker implements Runnable { @Override public void run() { while (isRunning) { Runnable nextTask = workQueue.poll(); if (nextTask != null) { nextTask.run(); } } } } }
      
      





ここで、プールで上記と同じタスクを実行しましょう。

MultithreadClientの行を変更します。







 // ExecutorService threadPool = Executors.newFixedThreadPool (8); ThreadPool threadPool = new ThreadPool (8);
      
      





実行時間はほぼ同じ-15秒です。







スレッドプールサイズ



プールで実行中のスレッドの数をさらに増やしてみましょう-最大100まで。







 ThreadPool threadPool = new ThreadPool(100);
      
      





実行時間が28秒に増加したことがわかります-なぜこれが起こったのですか?







たとえば、プロセッサコンテキストの絶え間ない切り替え、あるタスクで作業を一時停止し、別のタスクに切り替える必要がある場合、生産性が低下する可能性があるいくつかの独立した理由があります。 プロセッサが状態の切り替えでビジーである間、どのタスクでも有用な作業を行いません。







プロセスコンテキストスイッチの数は、topコマンドが発行されたときにcswパラメーターを調べることで確認できます。







8スレッド:

画像







100スレッド:

画像







プールサイズの選択方法







サイズは、実行されるタスクのタイプによって異なります。 もちろん、スレッドプールのサイズをハードコーディングする必要はめったにありません。むしろ、カスタマイズ可能であり、最適なサイズは実行可能タスクのスループットの監視から導き出されます。







スレッドが互いにブロックしないと仮定すると、I / O待機サイクルはなく、処理時間は同じで、最適なスレッドプール= Runtime.getRuntime()。AvailableProcessors()+ 1です。







スレッドが主にI / Oを期待している場合、最適なプールサイズは、プロセスタイムアウトと計算時間の比率によって増加する必要があります。 たとえば。 iowaitで時間の50%を費やすプロセスがある場合、プールサイズは2 * Runtime.getRuntime()。AvailableProcessors()+ 1になります。







他の種類のプール



  1. キュー内のMemoryAwareThreadPoolExecutorタスクが多すぎる場合にタスク送信をブロックするメモリ制限スレッドプール







  2. 実行時にプールサイズを制御および構成するためにJMXコンポーネントを登録するスレッドプール。

    JMXEnabledThreadPoolExecutor


ソースコードはこちらにあります







[*]-テストは正確ではありません。より正確なテストを使用するには、 http//openjdk.java.net/projects/code-tools/jmh/








All Articles