少し前まで、Node.jsで悪魔(ワーカー)を実装して、当社が開発したすべてのプロジェクトで使用できるようにすることができました。 多くの場合、複数のサーバー上のビデオファイルを分散方式で処理および配布する必要があるプロジェクトを開発するため、このための既製のツールが必要です。
問題文:
- 開発はNode.jsで行う必要があります。 私たちは長い間、すべてのプロジェクトの開発にこのプラットフォームを使用してきたので、ここでは合理的な選択です。
- クラスター内のすべてのノードは同等でなければなりません。 特別なマネージャーやマスターはいるべきではありません。 そうしないと、ウィザードを停止すると、クラスター全体が停止する場合があります。
- タスクはMySQLテーブルにある必要があります。 MQを使用するよりもはるかに柔軟で有益です。 常にすべてのタスクにアクセスしたり、キューを評価したり、別のノードに再割り当てしたりできます。
- 各ワーカーは、一度に複数のタスクを処理できる必要があります。
すぐにGitHubへのリンクを提供してください :( https://github.com/pipll/node-daemons )。
ランニングワーカー
各ワーカーは個別のNode.jsプロセスです。 ワーカープロセスを作成するには、 クラスターの組み込みモジュールが使用されます。 彼はまた、労働者の転倒を制御し、彼らを再起動します。
'use strict'; const config = require('./config/config'); const _ = require('lodash'); const path = require('path'); const cluster = require('cluster'); const logger = require('log4js').getLogger('app'); const models = require('./models'); // let shutdownInterval = null; if (cluster.isMaster) { // _.each(config.workers, (conf, name) => { if (conf.enabled) { startWorker(name); } }); } else { // let name = process.env.WORKER_NAME; let WorkerClass = require(path.join(__dirname, 'workers', name + '.js')); let worker = null; if (WorkerClass) { worker = new WorkerClass(name, config.workers[name]); // worker.start(); // , worker.on('stop', () => { process.exit(); }); } // process.on('message', message => { if ('shutdown' === message) { if (worker) { worker.stop(); } else { process.exit(); } } }); } // Shutdown process.on('SIGTERM', shutdownCluster); process.on('SIGINT', shutdownCluster); // function startWorker(name) { let worker = cluster.fork({WORKER_NAME: name}).on('online', () => { logger.info('Start %s worker #%d.', name, worker.id); }).on('exit', status => { // if ((worker.exitedAfterDisconnect || worker.suicide) === true || status === 0) { // , logger.info('Worker %s #%d was killed.', name, worker.id); } else { // , logger.warn('Worker %s #%d was died. Replace it with a new one.', name, worker.id); startWorker(name); } }); } // function shutdownCluster() { if (cluster.isMaster) { clearInterval(shutdownInterval); if (_.size(cluster.workers) > 0) { // logger.info('Shutdown workers:', _.size(cluster.workers)); _.each(cluster.workers, worker => { try { worker.send('shutdown'); } catch (err) { logger.warn('Cannot send shutdown message to worker:', err); } }); // shutdownInterval = setInterval(() => { if (_.size(cluster.workers) === 0) { process.exit(); } }, config.shutdownInterval); } else { process.exit(); } } }
いくつかの点に注意を喚起したいと思います。
- ワーカーを開始するには、プロセスのフォークが使用され、開始されたワーカーの名前が環境変数WORKER_NAMEで渡されます。
- ワーカーが制御不能にシャットダウンした場合、再起動します。
- ワーカーを制御された方法で制御するために、 シャットダウン信号を送信します。 ワーカーはこのイベントに応答し、タスクの完了後に
process.exit()
ます。 - ウィザードは
setInterval
でワーカーの数を監視し、すべてのワーカーが停止すると、process.exit()
が実行します。
ベースワーカーコンポーネント
このコンポーネントは定期的なプロセスを実行するように設計されており、タスクキューでは機能しません。
'use strict'; const _ = require('lodash'); const Promise = require('bluebird'); const log4js = require('log4js'); const EventEmitter = require('events'); const WorkerStates = require('./worker_states'); class Worker extends EventEmitter { constructor(name, conf) { super(); this.name = name; // this.conf = _.defaults({}, conf, { sleep: 1000 // }); this.logger = log4js.getLogger('worker-' + name); // this.stopped = true; // loop this.timer = null; // this.state = null; } // start() { this.logger.info('Start'); this.stopped = false; this.state = WorkerStates.STATE_IDLE; return this._startLoop(); } // stop() { this.logger.info('Stop'); this.stopped = true; if (this.state === WorkerStates.STATE_IDLE) { // if (this.timer) { clearTimeout(this.timer); this.timer = null; } this.state = WorkerStates.STATE_STOP; // this.emit('stop'); } } // loop() { return Promise.resolve(); } // loop _startLoop() { this.state = WorkerStates.STATE_WORK; return this.loop().catch(err => { this.logger.warn('Loop error:', err); }).finally(() => { this.state = WorkerStates.STATE_IDLE; if (!this.stopped) { // loop this.timer = setTimeout(() => { this._startLoop(); }, this.conf.sleep); } else { this.state = WorkerStates.STATE_STOP; // this.emit('stop'); } }); } } module.exports = Worker;
最も単純なワーカーコードは次のようになります。
'use strict'; const Promise = require('bluebird'); const Worker = require('../components/worker'); class Sample extends Worker { loop() { this.logger.info("Loop method"); return Promise.resolve().delay(30000); } } module.exports = Sample;
いくつかの機能:
-
loop
メソッドは、子孫の継承とビジネスタスクの実装を目的としています。 このメソッドの戻り値はPromise
必要があります。 -
loop
メソッドの最後に、ワーカーの設定で指定された時間が経過した後に再び開始します。 - ワーカーには3つの状態があります。
- STATE_IDLE-
loop
メソッドの開始間の一時停止中。 - STATE_WORK-
loop
メソッドの実行中。 - STATE_STOP-ワーカーを停止した後。
- STATE_IDLE-
ワーカーワーカーコンポーネント
これは、MySQLテーブルのタスクの並列処理用に設計されたメインコンポーネントです。
'use strict'; const config = require('../config/config'); const _ = require('lodash'); const Promise = require('bluebird'); const Worker = require('./worker'); const WorkerStates = require('./worker_states'); const models = require('../models'); class TaskWorker extends Worker { constructor(name, conf) { super(name, conf); // this.conf = _.defaults({}, this.conf, { maxAttempts: 3, // delayRatio: 300000, // count: 1, // queue: '', // update: 3000 // }); // this.count = 0; } loop() { if (this.count < this.conf.count && !this.stopped) { // return this._getTask().then(task => { if (task) { // this.count++; // let interval = setInterval(() => { return models.sequelize.transaction(t => { return task.touch({transaction: t}); }); }, this.conf.update); // this.handleTask(task.get({plain: true})).then(() => { // return models.sequelize.transaction(t => { return task.complete({transaction: t}).then(() => { this.logger.info('Task completed:', task.id); }); }); }).catch(err => { // - this.logger.warn('Handle error:', err); return this.delay(task).then(delay => { return models.sequelize.transaction(t => { return task.fail(delay, {transaction: t}).then(() => { this.logger.warn('Task failed:', task.id); }); }); }); }).finally(() => { clearInterval(interval); this.count--; }).done(); return null; } }); } else { return Promise.resolve(); } } // handleTask() { return Promise.resolve(); } // delay(task) { return Promise.resolve().then(() => { return task.attempts * this.conf.delayRatio; }); } // _getTask() { return models.sequelize.transaction({autocommit: false}, t => { return models.Task.scope({ method: ['forWork', this.conf.queue, config.node_id] }).find({transaction: t, lock: t.LOCK.UPDATE}).then(task => { if (task) { return task.work(config.node_id, {transaction: t}); } }); }); } _startLoop() { this.state = WorkerStates.STATE_WORK; return this.loop().catch(err => { this.logger.warn('Loop error:', err); }).finally(() => { if (this.count === 0) { this.state = WorkerStates.STATE_IDLE; } if (this.stopped && this.count === 0) { this.state = WorkerStates.STATE_STOP; this.emit('stop'); } else { this.timer = setTimeout(() => { this._startLoop(); }, this.conf.sleep); } }); } } module.exports = TaskWorker;
最も単純なワーカーコードは次のようになります。
'use strict'; const Promise = require('bluebird'); const TaskWorker = require('../components/task_worker'); class Sample extends TaskWorker { handleTask(task) { this.logger.info('Sample Task:', task); return Promise.resolve().delay(30000); } } module.exports = Sample;
コンポーネントの機能:
- データベースからタスクを取得するには、自動コミットを無効にした1つのトランザクションで、
SELECT ... FOR UPDATE
コンストラクトとデータベース内の後続のUPDATE
レコードを使用します。 これにより、複数のサーバーからの同時リクエストがあっても、タスクへの排他的アクセスを取得できます。 - タスクの処理中に、データベース内のタスクのステータスを更新する定期的なプロセスが開始されます。 これは、長時間実行されるタスクと、ステータスを更新せずに予期せずに完了したタスクを区別するために必要です。
- 処理されたタスクのステータスは、
handleTask
メソッドによって返されるPromise
ステータスによってhandleTask
ます。 成功すると、タスクは完了としてマークされます。 それ以外の場合、タスクは失敗としてマークされ、delay
メソッドで設定されたdelay
で開始されます。
タスクモデル
sequelizeモジュールは、データベースモデルを操作するために使用されます。 すべてのタスクはタスクテーブルにあります。 テーブルの構造は次のとおりです。
フィールド | 種類 | 説明 |
---|---|---|
id
| 整数、自動インクリメント | タスクID |
node_id
| 整数、ヌル可能 | タスクが対象とするノードのID |
queue
| ひも | タスクキュー |
status
| 列挙型 | タスクの状態 |
attempts
| 整数 | 実行中のタスクの数 |
priority
| 整数 | タスクの優先度 |
body
| ひも | JSON形式のタスクの本文 |
start_at
| 日時、ヌル可能 | タスク処理の開始日時 |
finish_at
| 日時、ヌル可能 | タスク処理の完了日時(TTLアナログ) |
worker_node_id
| 整数、ヌル可能 | タスクの処理を開始したノードのID |
worker_started_at
| 日時、ヌル可能 | タスク処理の開始日時 |
checked_at
| 日時、ヌル可能 | タスクのステータスを更新した日付と時刻 |
created_at
| 日時、ヌル可能 | タスク作成の日付と時刻 |
updated_at
| 日時、ヌル可能 | タスク変更の日付と時刻 |
タスクは、特定のノードまたはクラスター内の任意のノードに割り当てることができます。 これは、タスクの作成中にnode_id
フィールドによって規制されます。 タスクは、遅延( start_at
フィールド)および処理時間制限( finish_at
フィールド)で開始できます。
さまざまなワーカーが、 queue
フィールドで定義されたさまざまなタスクキューを操作します。 処理の優先度はpriority
フィールドに設定されます(値が大きいほど優先度が高くなります)。 タスクの再起動の回数は、 attempts
フィールドに保存されます。 JSON形式のタスクのbody
( body
フィールド)では、ワーカーに必要なパラメーターが転送されます。
checked_at
フィールドは、実行中のタスクの兆候として機能します。 その値は、タスク中に絶えず変化しています。 checked_at
フィールドの値が長時間変更されておらず、タスクが作業ステータスにある場合、タスクは失敗とみなされ、そのステータスはfailureに変更されます。
'use strict'; const moment = require('moment'); module.exports = function(sequelize, Sequelize) { return sequelize.define('Task', { id: { type: Sequelize.INTEGER, primaryKey: true, autoIncrement: true }, node_id: { type: Sequelize.INTEGER }, queue: { type: Sequelize.STRING, allowNull: false }, status: { type: Sequelize.ENUM, values: ['pending', 'working', 'done', 'failure'], defaultValue: 'pending', allowNull: false }, attempts: { type: Sequelize.INTEGER, defaultValue: 0, allowNull: false }, priority: { type: Sequelize.INTEGER, defaultValue: 10, allowNull: false }, body: { type: Sequelize.TEXT, set: function(body) { return this.setDataValue('body', JSON.stringify(body)); }, get: function() { try { return JSON.parse(this.getDataValue('body')); } catch (e) { return null; } } }, start_at: { type: Sequelize.DATE }, finish_at: { type: Sequelize.DATE }, worker_node_id: { type: Sequelize.INTEGER }, worker_started_at: { type: Sequelize.DATE }, checked_at: { type: Sequelize.DATE } }, { tableName: 'tasks', freezeTableName: true, underscored: true, scopes: { forWork: function(queue, node_id) { return { where: { node_id: { $or: [ null, node_id ] }, queue: queue, status: 'pending', start_at: { $or: [ null, { $lt: moment().toDate() } ] }, finish_at: { $or: [ null, { $gte: moment().toDate() } ] } }, order: [ ['priority', 'DESC'], ['attempts', 'ASC'], [sequelize.fn('IFNULL', sequelize.col('start_at'), sequelize.col('created_at')), 'ASC'] ] }; } }, instanceMethods: { fail: function(delay, options) { this.start_at = delay ? moment().add(delay, 'ms').toDate() : null; this.attempts = sequelize.literal('attempts + 1'); this.status = 'failure'; return this.save(options); }, complete: function(options) { this.status = 'done'; return this.save(options); }, work: function(node_id, options) { this.status = 'working'; this.worker_node_id = node_id; this.worker_started_at = moment().toDate(); return this.save(options); }, check: function(options) { this.checked_at = moment().toDate(); return this.save(options); } } }); };
タスクのライフサイクル
すべてのタスクは次のライフサイクルを経ます。
- ステータスが保留中の新しいタスクが作成されます。
- タスクを処理する時間になると、最初の空きワーカーがタスクを取得し、 作業ステータスに転送して、フィールド
worker_node_id
およびworker_started_at
ます。 - タスクの処理中、ワーカーは一定の周期(デフォルトでは10秒ごと)で、
checked_at
フィールドを更新して正しい操作をchecked_at
します。 - タスクの作業の最後に、イベントの開発のためのいくつかのオプションがあります。
4.1。 タスクが正常に完了した場合、タスクは完了ステータスに移行されます。
4.2。 タスクが完了すると、タスクは失敗状態に移行し、attempts
回数が増加し、指定された時間(試行回数に基づいてdelay
メソッドで計算され、delayRatio
を設定して)起動がdelay
します。
プロジェクトには、各ノードで実行され、次のタスク処理を実行するビルトインManager
モジュールもあります。
- 中断されたタスクを失敗ステータスにします。
- 失敗したタスクを新しい処理のために保留状態に変換します。
- すでに期限切れになっている未処理のタスクを削除します。
- 正常に完了したタスクを1時間の遅延で削除します(構成可能)。
- 失敗した完了タスクを削除します。3日間の遅延で構成された起動試行回数を使用します。
さらに、このモジュールはノードの有効化/無効化で機能します。
'use strict'; const _ = require('lodash'); const moment = require('moment'); const Promise = require('bluebird'); const Worker = require('../components/worker'); const models = require('../models'); const config = require('../config/config'); class Manager extends Worker { constructor(name, conf) { super(name, conf); this.conf = _.defaults({}, this.conf, { maxUpdate: 30000, // 30 seconds maxCompleted: 3600000, // 1 hour maxFailed: 259200000 // 3 days }); } loop() { return models.sequelize.transaction(t => { return Promise.resolve() .then(() => { return this._checkCurrentNode(t); }) .then(() => { return this._activateNodes(t); }) .then(() => { return this._pauseNodes(t); }) .then(() => { return this._restoreFrozenTasks(t); }) .then(() => { return this._restoreFailedTasks(t); }) .then(() => { return this._deleteDeadTasks(t); }) .then(() => { return this._deleteCompletedTasks(t); }) .then(() => { return this._deleteFailedTasks(t); }); }); } _checkCurrentNode(t) { return models.Node.findById(config.node_id, {transaction: t}).then(node => { if (node) { return node.check(); } }); } _activateNodes(t) { return models.Node.update({ is_active: true }, { where: { is_active: false, checked_at: { $gte: moment().subtract(2 * this.conf.sleep).toDate() } }, transaction: t }).spread(count => { if (count > 0) { this.logger.info('Activate nodes:', count); } }); } _pauseNodes(t) { return models.Node.update({ is_active: false }, { where: { is_active: true, checked_at: { $lt: moment().subtract(2 * this.conf.sleep).toDate() } }, transaction: t }).spread(count => { if (count > 0) { this.logger.info('Pause nodes:', count); } }); } _restoreFrozenTasks(t) { return models.Task.update({ status: 'failure', attempts: models.sequelize.literal('attempts + 1') }, { where: { status: 'working', checked_at: { $lt: moment().subtract(this.conf.maxUpdate).toDate() } }, transaction: t }).spread(count => { if (count > 0) { this.logger.info('Restore frozen tasks:', count); } }); } _restoreFailedTasks(t) { let where = [{status: 'failure'}]; let conditions = this._failedTasksConditions(); if (conditions.length) { where.push({$or: conditions}); } return models.Task.update({ status: 'pending', worker_node_id: null, worker_started_at: null }, { where: where, transaction: t }).spread(count => { if (count > 0) { this.logger.info('Restore failure tasks:', count); } }); } _deleteDeadTasks(t) { return models.Task.destroy({ where: { status: 'pending', finish_at: { $lt: moment().toDate() } }, transaction: t }).then(count => { if (count > 0) { this.logger.info('Delete dead tasks:', count); } }); } _deleteCompletedTasks(t) { return models.Task.destroy({ where: { status: 'done', checked_at: { $lt: moment().subtract(this.conf.maxCompleted).toDate() } }, transaction: t }).then(count => { if (count > 0) { this.logger.info('Delete completed tasks:', count); } }); } _deleteFailedTasks(t) { let where = [ {status: 'failure'}, {checked_at: { $lt: moment().subtract(this.conf.maxFailed).toDate() }} ]; let conditions = this._failedTasksConditions(); if (conditions.length) { where.push({$or: conditions}); } return models.Task.destroy({ where: where, transaction: t }).then(count => { if (count > 0) { this.logger.info('Delete failed tasks:', count); } }); } _failedTasksConditions() { let conditions = []; _.each(config.workers, (worker) => { if (worker.queue) { let item = {queue: worker.queue}; if (worker.maxAttempts !== undefined) { item.attempts = { $lt: worker.maxAttempts }; } conditions.push(item); } }); return conditions; } } module.exports = Manager;
結論と将来の計画
一般に、バックグラウンドタスクを操作するための優れた、かなり信頼性の高いツールであることが判明しました。これはコミュニティで共有できます。 私たちは、このプロジェクトで使用される基本的なアイデアと原則を、さまざまなプロジェクトでの数年間の作業で開発しました。
プロジェクト開発計画:
- マスタープロセスを再起動せずにワーカーをホットリロードします。 そのため、他の作業を妨げることなく、個々のワーカーのコードを更新できます。
- タスクの進行状況(フィールド
progress
)に関する情報を追加し、この情報を更新するメソッドをTaskWorker
モジュールに追加します。 - タスクとノードを操作するためのさまざまなインターフェースの作成:
- CLI
- Web
- API
- 基本的なワーカーの作成:
- ビデオファイルの処理。
- 信頼できるストレージのためのサーバー間のファイル複製。
- 労働者のためのテスト。
私は建設的な批判を歓迎し、コメントに興味のある質問に答えます。