Node.jsで100万のいいねまたはタスクキューを収集する

先週、1ラウンドの日付を祝いました-Likeastoreデータベースに蓄積された100万のユーザーのいいね!



JavaScriptを使用しています。現在のサービスはすべてJavaScript / Node.jsで記述されています。 一般に、私たちのプロジェクトでNode.jsを使用することを後悔していません。HTTPAPIを実装するための最良のツールであることが証明されています。 しかし、「いいね」を収集するには、常に動作するdaemon



でなければなりません。 おそらくNode.jsの最も典型的なタスクではありません-実装の詳細といくつかの落とし穴については、さらに読みます。



コレクター



コレクター、Likeastoreアーキテクチャーの主要コンポーネント。 オープンサービスAPIを介してデータを収集し、応答を処理し、データとその現在の状態を保存します。 実際、これは実行されるタスクのリストを作成して開始し、その後手順が繰り返される「無限のサイクル」です。



最大の効率を得るために、コレクターの2つのインスタンスを実行し、異なるモードで動作します:初期、通常。 初期モードでは、コレクターは新しく接続されたネットワークのみを処理します。 したがって、ユーザーはネットワークに接続した後、できるだけ早く「いいね」を受け取ります。 すべての「いいね」がアップロードされると、ネットワークは通常モードになり、別のインスタンスによって処理されます。この場合、充電間隔ははるかに長くなります。



 var argv = require('optimist').argv; var env = process.env.NODE_ENV = process.env.NODE_ENV || 'development'; var mode = process.env.COLLECTOR_MODE = process.env.COLLECTOR_MODE || argv.mode || 'normal'; var scheduler = require('./source/engine/scheduler'); scheduler(mode).run();
      
      





スケジューラー



スケジューラは、Node.js用に記述されたwhile(true)



と本質的に同じです。 率直に言って、「同期」モードから「非同期」モードに思考を切り替えることは、私にとって最も簡単なプロセスではありませんでした。 Node.jsで無限の数のタスクを起動するのは困難なタスクのように思えました;考えの結果、この質問SOで生まれました



1つのオプションはasync.queueを使用することでしたが 、これは明らかなように見えましたが、このタスクには最適ではありませんでした。 何度か試行した結果、setTimeoutが非同期while(true)に最適なオプションであることが判明しました。



 function scheduler (mode) { function schedulerLoop() { runCollectingTasks(restartScheduler); } function restartScheduler (err, results) { if (err) { logger.error(err); } // http://stackoverflow.com/questions/16072699/nodejs-settimeout-memory-leak var timeout = setTimeout(schedulerLoop, config.collector.schedulerRestart); } // ... return { run: function () { schedulerLoop(); } }; }
      
      







ここでは、Node.jsのデーモンの非常に落とし穴-メモリリークに注意する必要があります。 コレクターが長時間働いた後、彼はその時点で大量のメモリを消費し始め、最も予期せぬ瞬間が止まったことに気付きました。 SOに関する質問のあるコード内のコメントに注意してください。 var timeout =



を追加した後、状況は改善されましたが、根本的には改善されませんでした。



もう1つの理由は、メモリリークに関する壮大な投稿とJoyentとWallmartのエンジニアによる調査の後です。 Node.js v0.10.22への移行により、コレクターはさらに安定して動作するようになりました。 それにもかかわらず、自発的な停止が発生し、その理由を理解することは非常に困難です。



ネットワークと状態



ユーザーが新しいネットワークに接続すると、ユーザーID、アクセストークン、その他のサービス情報を含むドキュメントがnetworks



コレクションに配置されます。 同じドキュメントで、コレクターは現在の状態(動作モード、エラーの有無、エラーの数、処理中の現在のデータページ)を非正規化します。 つまり 実際、これは実行可能なタスクが作成される基礎となる同じドキュメントです。



 function runCollectingTasks(callback) { prepareCollectingTasks(function (err, tasks) { if (err) { return callback(err); } runTasks(tasks, 'collecting', callback); }); } function prepareCollectingTasks(callback) { networks.findByMode(mode, function (err, states) { if (err) { return callback({message: 'error during networks query', err: err}); } if (!states) { return callback({message: 'failed to read networks states'}); } callback(null, createCollectingTasks(states)); }); }
      
      







タスク



状態に基づいて、実行可能なタスクのリストを作成します。 人気のあるサービスのほとんどすべてのオープンAPIには、期間ごとのリクエスト数に制限があります。 コレクターのタスクは、最も効率的な数のリクエストを処理することであり、レート制限に達することではありません。



現在の時点より後にスケジュールされたタスクのみが実行を許可されます。



 function createCollectingTasks(states) { var tasks = states.map(function (state) { return allowedToExecute(state) ? collectingTask(state) : null; }).filter(function (task) { return task !== null; }); return tasks; } function allowedToExecute (state) { return moment().diff(state.scheduledTo) > 0; } function collectingTask(state) { return function (callback) { return executor(state, connectors, callback); }; }
      
      







データの配列は、 runTasks



入力にrunTasks



れる関数の配列に変換されます。 runTasks



は、 async.series



を介して各タスクを順番に実行します。



 function runTasks(tasks, type, callback) { async.series(tasks, function (err) { if (err) { // report error but continue execution to do not break execution chain.. logger.error(err); } callback(null, tasks.length); }); }
      
      







執行者



現在の状態、既存のコネクタのリスト、およびコールバック関数を取得する一般化された関数(その本質を示すためにすべてのエラー処理とログを削除しました)。



 function executor(state, connectors, callback) { var service = state.service; var connector = connectors[service]; var connectorStarted = moment(); connector(state, user, connectorExecuted); function connectorExecuted(err, updatedState, results) { saveConnectorState(state, connectorStateSaved); function saveConnectorState (state, callback) { // ... } function connectorStateSaved (err) { // ... saveConnectorResults(results, connectorResultsSaved); } function saveConnectorResults(results, callback) { // ... connectorResultsSaved(results, connectorResultsSaved); } function connectorResultsSaved (err, saveDuration) { // ... callback(null); } } }
      
      







コネクター



コネクタは、APIへのHTTP要求を実装し、その応答を処理し、タスクのステータスを更新し(次回の実行予定)、収集したデータを返す関数です。 「いいね」のコレクションがどの状態にあるかを区別するのはコネクタです。これに応じて、必要な要求を行います(目的の要求URIを作成します)。



現時点では、9つのコネクタを実装していますが、そのコードはほぼ同じです。 最初は、既製のAPIアクセスライブラリを常に探していましたが、これは間違った戦略であることが判明しました。いくつかのオプションから選択する必要があります。 最も柔軟で簡単なソリューションは、 リクエスト (Node.jsに最適なHTTPクライアント)を使用することでした。



GitHubのコードの例を示します(ここでも、本質を示すために詳細を短縮します)。



 var API = 'https://api.github.com'; function connector(state, user, callback) { var accessToken = state.accessToken; if (!accessToken) { return callback('missing accessToken for user: ' + state.user); } initState(state); var uri = formatRequestUri(accessToken, state); var headers = { 'Content-Type': 'application/json', 'User-Agent': 'likeastore/collector'}; request({uri: uri, headers: headers, timeout: config.collector.request.timeout, json: true}, function (err, response, body) { if (err) { return handleUnexpected(response, body, state, err, function (err) { callback (err, state); }); } return handleResponse(response, body); }); function initState(state) { // intialize state fields (page, errors, mode etc.) ... } function formatRequestUri(accessToken, state) { // create request URI based on values from state ... } function handleResponse(response, body) { var rateLimit = +response.headers['x-ratelimit-remaining']; var stars = body.map(function (r) { return { // transforms response into likeastore item }; }); return callback(null, scheduleTo(updateState(state, stars, rateLimit, false)), stars); } function updateState(state, data, rateLimit, failed) { state.lastExecution = moment().toDate(); // update other state fields (next page, mode) ... return state; } }
      
      







スケジュール



最後に、コネクタが完了してステータスが更新されたら、次の開始時間を計算する必要があります。 APIの制限とコレクターのモードに基づいて計算されます(初期モードの場合、一時停止は最小限で、通常モードの場合はより長く、通常は15分です。コネクターがレート制限に達すると一時停止が最大になります)。



 function scheduleTo(state) { var currentMoment = moment(); var service = state.service; var scheduleForMode = { initial: config.collector.quotes[service].runAfter, normal: config.collector.nextNormalRunAfter, rateLimit: config.collector.nextRateLimitRunAfter }; var next = scheduleForMode[state.mode]; var scheduledTo = currentMoment.add(next, 'milliseconds'); state.scheduledTo = scheduledTo.toDate(); return state; }
      
      







昨年の9月頃に「落ち着いた」簡単なコードがありますが、それ以降に追加するのは新しいコネクタのみで、エンジン自体は変更されていません。 Node.jsでタスクキューを実行するために別のライブラリを割り当てることを考えていますが、これが一般的なタスクであるかどうかはわかりません。



時間がなくなり、ユーザーの数が増えており、現在、3,000のタスクの処理には約30分かかります。これは非常に長い時間です(サイクルタイムを15分以内に維持しようとしています)。 将来的には、コレクターのアーキテクチャは、メッセージキューの方向とコレクターの分離において、動作モードではなく、水平スケーラビリティを容易にするための別の基準(ネットワークタイプ、ユーザークラスター)に従って変わると思います。



All Articles