Node.jsでのデーモンの実装

少し前まで、Node.jsで悪魔(ワーカー)を実装して、当社が開発したすべてのプロジェクトで使用できるようにすることができました。 多くの場合、複数のサーバー上のビデオファイルを分散方式で処理および配布する必要があるプロジェクトを開発するため、このための既製のツールが必要です。







問題文:









すぐにGitHubへのリンクを提供してください :( https://github.com/pipll/node-daemons )。







ランニングワーカー



各ワーカーは個別のNode.jsプロセスです。 ワーカープロセスを作成するには、 クラスターの組み込みモジュールが使用されます。 彼はまた、労働者の転倒を制御し、彼らを再起動します。







ワーカー起動コード
'use strict'; const config = require('./config/config'); const _ = require('lodash'); const path = require('path'); const cluster = require('cluster'); const logger = require('log4js').getLogger('app'); const models = require('./models'); //         let shutdownInterval = null; if (cluster.isMaster) { //   _.each(config.workers, (conf, name) => { if (conf.enabled) { startWorker(name); } }); } else { //   let name = process.env.WORKER_NAME; let WorkerClass = require(path.join(__dirname, 'workers', name + '.js')); let worker = null; if (WorkerClass) { worker = new WorkerClass(name, config.workers[name]); //   worker.start(); //   ,    worker.on('stop', () => { process.exit(); }); } //      process.on('message', message => { if ('shutdown' === message) { if (worker) { worker.stop(); } else { process.exit(); } } }); } // Shutdown process.on('SIGTERM', shutdownCluster); process.on('SIGINT', shutdownCluster); //    function startWorker(name) { let worker = cluster.fork({WORKER_NAME: name}).on('online', () => { logger.info('Start %s worker #%d.', name, worker.id); }).on('exit', status => { //     if ((worker.exitedAfterDisconnect || worker.suicide) === true || status === 0) { //     ,      logger.info('Worker %s #%d was killed.', name, worker.id); } else { //     ,      logger.warn('Worker %s #%d was died. Replace it with a new one.', name, worker.id); startWorker(name); } }); } //    function shutdownCluster() { if (cluster.isMaster) { clearInterval(shutdownInterval); if (_.size(cluster.workers) > 0) { //      logger.info('Shutdown workers:', _.size(cluster.workers)); _.each(cluster.workers, worker => { try { worker.send('shutdown'); } catch (err) { logger.warn('Cannot send shutdown message to worker:', err); } }); //     shutdownInterval = setInterval(() => { if (_.size(cluster.workers) === 0) { process.exit(); } }, config.shutdownInterval); } else { process.exit(); } } }
      
      





いくつかの点に注意を喚起したいと思います。









ベースワーカーコンポーネント



このコンポーネントは定期的なプロセスを実行するように設計されており、タスクキューでは機能しません。







ベースワーカーコード
 'use strict'; const _ = require('lodash'); const Promise = require('bluebird'); const log4js = require('log4js'); const EventEmitter = require('events'); const WorkerStates = require('./worker_states'); class Worker extends EventEmitter { constructor(name, conf) { super(); this.name = name; //     this.conf = _.defaults({}, conf, { sleep: 1000 //    }); this.logger = log4js.getLogger('worker-' + name); //    this.stopped = true; //      loop  this.timer = null; //   this.state = null; } //    start() { this.logger.info('Start'); this.stopped = false; this.state = WorkerStates.STATE_IDLE; return this._startLoop(); } //    stop() { this.logger.info('Stop'); this.stopped = true; if (this.state === WorkerStates.STATE_IDLE) { //    if (this.timer) { clearTimeout(this.timer); this.timer = null; } this.state = WorkerStates.STATE_STOP; //     this.emit('stop'); } } //      loop() { return Promise.resolve(); } //     loop  _startLoop() { this.state = WorkerStates.STATE_WORK; return this.loop().catch(err => { this.logger.warn('Loop error:', err); }).finally(() => { this.state = WorkerStates.STATE_IDLE; if (!this.stopped) { //       loop  this.timer = setTimeout(() => { this._startLoop(); }, this.conf.sleep); } else { this.state = WorkerStates.STATE_STOP; //     this.emit('stop'); } }); } } module.exports = Worker;
      
      





最も単純なワーカーコードは次のようになります。







 'use strict'; const Promise = require('bluebird'); const Worker = require('../components/worker'); class Sample extends Worker { loop() { this.logger.info("Loop method"); return Promise.resolve().delay(30000); } } module.exports = Sample;
      
      





いくつかの機能:









ワーカーワーカーコンポーネント



これは、MySQLテーブルのタスクの並列処理用に設計されたメインコンポーネントです。







タスク処理ワーカーコード
 'use strict'; const config = require('../config/config'); const _ = require('lodash'); const Promise = require('bluebird'); const Worker = require('./worker'); const WorkerStates = require('./worker_states'); const models = require('../models'); class TaskWorker extends Worker { constructor(name, conf) { super(name, conf); //     this.conf = _.defaults({}, this.conf, { maxAttempts: 3, //      delayRatio: 300000, //      count: 1, //      queue: '', //      update: 3000 //     }); //     this.count = 0; } loop() { if (this.count < this.conf.count && !this.stopped) { //    return this._getTask().then(task => { if (task) { //      this.count++; //      let interval = setInterval(() => { return models.sequelize.transaction(t => { return task.touch({transaction: t}); }); }, this.conf.update); //     this.handleTask(task.get({plain: true})).then(() => { //   return models.sequelize.transaction(t => { return task.complete({transaction: t}).then(() => { this.logger.info('Task completed:', task.id); }); }); }).catch(err => { //     -   this.logger.warn('Handle error:', err); return this.delay(task).then(delay => { return models.sequelize.transaction(t => { return task.fail(delay, {transaction: t}).then(() => { this.logger.warn('Task failed:', task.id); }); }); }); }).finally(() => { clearInterval(interval); this.count--; }).done(); return null; } }); } else { return Promise.resolve(); } } //    handleTask() { return Promise.resolve(); } //        delay(task) { return Promise.resolve().then(() => { return task.attempts * this.conf.delayRatio; }); } //      _getTask() { return models.sequelize.transaction({autocommit: false}, t => { return models.Task.scope({ method: ['forWork', this.conf.queue, config.node_id] }).find({transaction: t, lock: t.LOCK.UPDATE}).then(task => { if (task) { return task.work(config.node_id, {transaction: t}); } }); }); } _startLoop() { this.state = WorkerStates.STATE_WORK; return this.loop().catch(err => { this.logger.warn('Loop error:', err); }).finally(() => { if (this.count === 0) { this.state = WorkerStates.STATE_IDLE; } if (this.stopped && this.count === 0) { this.state = WorkerStates.STATE_STOP; this.emit('stop'); } else { this.timer = setTimeout(() => { this._startLoop(); }, this.conf.sleep); } }); } } module.exports = TaskWorker;
      
      





最も単純なワーカーコードは次のようになります。







 'use strict'; const Promise = require('bluebird'); const TaskWorker = require('../components/task_worker'); class Sample extends TaskWorker { handleTask(task) { this.logger.info('Sample Task:', task); return Promise.resolve().delay(30000); } } module.exports = Sample;
      
      





コンポーネントの機能:









タスクモデル



sequelizeモジュールは、データベースモデルを操作するために使用されます。 すべてのタスクはタスクテーブルにあります。 テーブルの構造は次のとおりです。







フィールド 種類 説明
id



整数、自動インクリメント タスクID
node_id



整数、ヌル可能 タスクが対象とするノードのID
queue



ひも タスクキュー
status



列挙型 タスクの状態
attempts



整数 実行中のタスクの数
priority



整数 タスクの優先度
body



ひも JSON形式のタスクの本文
start_at



日時、ヌル可能 タスク処理の開始日時
finish_at



日時、ヌル可能 タスク処理の完了日時(TTLアナログ)
worker_node_id



整数、ヌル可能 タスクの処理を開始したノードのID
worker_started_at



日時、ヌル可能 タスク処理の開始日時
checked_at



日時、ヌル可能 タスクのステータスを更新した日付と時刻
created_at



日時、ヌル可能 タスク作成の日付と時刻
updated_at



日時、ヌル可能 タスク変更の日付と時刻


タスクは、特定のノードまたはクラスター内の任意のノードに割り当てることができます。 これは、タスクの作成中にnode_id



フィールドによって規制されます。 タスクは、遅延( start_at



フィールド)および処理時間制限( finish_at



フィールド)で開始できます。







さまざまなワーカーが、 queue



フィールドで定義されたさまざまなタスクキューを操作します。 処理の優先度はpriority



フィールドに設定されます(値が大きいほど優先度が高くなります)。 タスクの再起動の回数は、 attempts



フィールドに保存されます。 JSON形式のタスクのbody



body



フィールド)では、ワーカーに必要なパラメーターが転送されます。







checked_at



フィールドは、実行中のタスクの兆候として機能します。 その値は、タスク中に絶えず変化しています。 checked_at



フィールドの値が長時間変更されておらず、タスクが作業ステータスにある場合、タスクは失敗とみなされ、そのステータスはfailureに変更されます。







タスクモデルコード
 'use strict'; const moment = require('moment'); module.exports = function(sequelize, Sequelize) { return sequelize.define('Task', { id: { type: Sequelize.INTEGER, primaryKey: true, autoIncrement: true }, node_id: { type: Sequelize.INTEGER }, queue: { type: Sequelize.STRING, allowNull: false }, status: { type: Sequelize.ENUM, values: ['pending', 'working', 'done', 'failure'], defaultValue: 'pending', allowNull: false }, attempts: { type: Sequelize.INTEGER, defaultValue: 0, allowNull: false }, priority: { type: Sequelize.INTEGER, defaultValue: 10, allowNull: false }, body: { type: Sequelize.TEXT, set: function(body) { return this.setDataValue('body', JSON.stringify(body)); }, get: function() { try { return JSON.parse(this.getDataValue('body')); } catch (e) { return null; } } }, start_at: { type: Sequelize.DATE }, finish_at: { type: Sequelize.DATE }, worker_node_id: { type: Sequelize.INTEGER }, worker_started_at: { type: Sequelize.DATE }, checked_at: { type: Sequelize.DATE } }, { tableName: 'tasks', freezeTableName: true, underscored: true, scopes: { forWork: function(queue, node_id) { return { where: { node_id: { $or: [ null, node_id ] }, queue: queue, status: 'pending', start_at: { $or: [ null, { $lt: moment().toDate() } ] }, finish_at: { $or: [ null, { $gte: moment().toDate() } ] } }, order: [ ['priority', 'DESC'], ['attempts', 'ASC'], [sequelize.fn('IFNULL', sequelize.col('start_at'), sequelize.col('created_at')), 'ASC'] ] }; } }, instanceMethods: { fail: function(delay, options) { this.start_at = delay ? moment().add(delay, 'ms').toDate() : null; this.attempts = sequelize.literal('attempts + 1'); this.status = 'failure'; return this.save(options); }, complete: function(options) { this.status = 'done'; return this.save(options); }, work: function(node_id, options) { this.status = 'working'; this.worker_node_id = node_id; this.worker_started_at = moment().toDate(); return this.save(options); }, check: function(options) { this.checked_at = moment().toDate(); return this.save(options); } } }); };
      
      





タスクのライフサイクル



すべてのタスクは次のライフサイクルを経ます。







  1. ステータスが保留中の新しいタスクが作成されます。
  2. タスクを処理する時間になると、最初の空きワーカーがタスクを取得し、 作業ステータスに転送して、フィールドworker_node_id



    およびworker_started_at



    ます。
  3. タスクの処理中、ワーカーは一定の周期(デフォルトでは10秒ごと)で、 checked_at



    フィールドを更新して正しい操作をchecked_at



    します。
  4. タスクの作業の最後に、イベントの開発のためのいくつかのオプションがあります。

    4.1。 タスクが正常に完了した場合、タスクは完了ステータスに移行さます。

    4.2。 タスクが完了すると、タスクは失敗状態に移行し、 attempts



    回数が増加し、指定された時間(試行回数に基づいてdelay



    メソッドで計算され、 delayRatio



    を設定して)起動がdelay



    します。


プロジェクトには、各ノードで実行され、次のタスク処理を実行するビルトインManager



モジュールもあります。







  1. 中断されたタスクを失敗ステータスにします。
  2. 失敗したタスクを新しい処理のために保留状態に変換します。
  3. すでに期限切れになっている未処理のタスクを削除します。
  4. 正常に完了したタスクを1時間の遅延で削除します(構成可能)。
  5. 失敗した完了タスクを削除します。3日間の遅延で構成された起動試行回数を使用します。


さらに、このモジュールはノードの有効化/無効化で機能します。







マネージャーモジュールコード
 'use strict'; const _ = require('lodash'); const moment = require('moment'); const Promise = require('bluebird'); const Worker = require('../components/worker'); const models = require('../models'); const config = require('../config/config'); class Manager extends Worker { constructor(name, conf) { super(name, conf); this.conf = _.defaults({}, this.conf, { maxUpdate: 30000, // 30 seconds maxCompleted: 3600000, // 1 hour maxFailed: 259200000 // 3 days }); } loop() { return models.sequelize.transaction(t => { return Promise.resolve() .then(() => { return this._checkCurrentNode(t); }) .then(() => { return this._activateNodes(t); }) .then(() => { return this._pauseNodes(t); }) .then(() => { return this._restoreFrozenTasks(t); }) .then(() => { return this._restoreFailedTasks(t); }) .then(() => { return this._deleteDeadTasks(t); }) .then(() => { return this._deleteCompletedTasks(t); }) .then(() => { return this._deleteFailedTasks(t); }); }); } _checkCurrentNode(t) { return models.Node.findById(config.node_id, {transaction: t}).then(node => { if (node) { return node.check(); } }); } _activateNodes(t) { return models.Node.update({ is_active: true }, { where: { is_active: false, checked_at: { $gte: moment().subtract(2 * this.conf.sleep).toDate() } }, transaction: t }).spread(count => { if (count > 0) { this.logger.info('Activate nodes:', count); } }); } _pauseNodes(t) { return models.Node.update({ is_active: false }, { where: { is_active: true, checked_at: { $lt: moment().subtract(2 * this.conf.sleep).toDate() } }, transaction: t }).spread(count => { if (count > 0) { this.logger.info('Pause nodes:', count); } }); } _restoreFrozenTasks(t) { return models.Task.update({ status: 'failure', attempts: models.sequelize.literal('attempts + 1') }, { where: { status: 'working', checked_at: { $lt: moment().subtract(this.conf.maxUpdate).toDate() } }, transaction: t }).spread(count => { if (count > 0) { this.logger.info('Restore frozen tasks:', count); } }); } _restoreFailedTasks(t) { let where = [{status: 'failure'}]; let conditions = this._failedTasksConditions(); if (conditions.length) { where.push({$or: conditions}); } return models.Task.update({ status: 'pending', worker_node_id: null, worker_started_at: null }, { where: where, transaction: t }).spread(count => { if (count > 0) { this.logger.info('Restore failure tasks:', count); } }); } _deleteDeadTasks(t) { return models.Task.destroy({ where: { status: 'pending', finish_at: { $lt: moment().toDate() } }, transaction: t }).then(count => { if (count > 0) { this.logger.info('Delete dead tasks:', count); } }); } _deleteCompletedTasks(t) { return models.Task.destroy({ where: { status: 'done', checked_at: { $lt: moment().subtract(this.conf.maxCompleted).toDate() } }, transaction: t }).then(count => { if (count > 0) { this.logger.info('Delete completed tasks:', count); } }); } _deleteFailedTasks(t) { let where = [ {status: 'failure'}, {checked_at: { $lt: moment().subtract(this.conf.maxFailed).toDate() }} ]; let conditions = this._failedTasksConditions(); if (conditions.length) { where.push({$or: conditions}); } return models.Task.destroy({ where: where, transaction: t }).then(count => { if (count > 0) { this.logger.info('Delete failed tasks:', count); } }); } _failedTasksConditions() { let conditions = []; _.each(config.workers, (worker) => { if (worker.queue) { let item = {queue: worker.queue}; if (worker.maxAttempts !== undefined) { item.attempts = { $lt: worker.maxAttempts }; } conditions.push(item); } }); return conditions; } } module.exports = Manager;
      
      





結論と将来の計画



一般に、バックグラウンドタスクを操作するための優れた、かなり信頼性の高いツールであることが判明しました。これはコミュニティで共有できます。 私たちは、このプロジェクトで使用される基本的なアイデアと原則を、さまざまなプロジェクトでの数年間の作業で開発しました。







プロジェクト開発計画:









私は建設的な批判を歓迎し、コメントに興味のある質問に答えます。








All Articles