このクライアントを使用して、本質的にnet.Socketであるチャネル間で要求を分散できます。 これを行うには、チャネルの開閉方法を変更して、チャネルにリクエストを追加する必要があります。
説明する例では、 pgライブラリが使用されます。これは、データベースを備えたサーバーへのソケットを開くための機能を提供します。 同時に、ライブラリによって提供されるデフォルトの接続プール管理は、いかなる方法でも使用されません。
最初に、1つのエンティティ(接続)を管理するConnectクラスを検討します。
クラス接続
/* , "pg://USER:PASSWORD@HOST:PORT/DATABASE" */ function Connect(connString) { // this._connString = connString; // , this._isRun = false; // , "maxCount" this._maxQueryCount = 100; // , _nextTick this._worked = false; // , this._queryCount = 0; // "" this._emitter = new (require('events').EventEmitter); // "" var self = this; // "open", this._emitter.on('open', function() { self._arrayQuery = []; }); // , , this._emitter.on('error', function(err) { throw err; }); // , this._emitter.on('maxCount', function(message) { self._setMax = true; }); // , , // net.Socket pg.connect(this._connString, function(err, client, done) { if (err) { return self._emitter.emit('error', err); } // "" , self._client = client; // " " self._done = done; // ( ) self._emitter.emit('open'); }); } /* , "" */ Connect.prototype.on = function(typeEvent, func) { if(typeEvent == 'error') { // this._emitter.removeAllListeners('error'); } this._emitter.addListener(typeEvent, func); }; /* , */ Connect.prototype.start = function() { this._isRun = true; this._nextTick(); }; /* , */ Connect.prototype.stop = function() { this._isRun = false; }; /* , ( ) */ Connect.prototype.isFull = function() { return this._setMax; }; /* , (.. , ) */ Connect.prototype.close = function () { if(this._done) { this._emitter.emit('close'); this._done(); } else { this._emitter.emit('error', new Error('connect is not active')); } }; /* , */ Connect.prototype.queryQueue = function () { return this._arrayQuery; }; /* - . , , */ Connect.prototype.addQuery = function (query, params, cb) { if(!(typeof query == 'string')) { return this._emitter.emit('error', new Error('not valid query')); } if( !(typeof params == "object") || !(params instanceof Array) ) { return this._emitter.emit('error', new Error('not valid argument')); } this._queryCount++; this._arrayQuery.push({ query: query, params: params, callback: cb }); if(this._queryCount > this._maxQueryCount) { this._emitter.emit('maxCount', 'in queue added too many requests, the waiting time increases'); } this._nextTick(); }; /* */ Connect.prototype.maxQueryCount = function (count) { if(count) { this._maxQueryCount = count; } else { return this._maxQueryCount; } }; /* */ Connect.prototype.queryCount = function () { return this._queryCount; }; /* , , , , ( _arrayQuery) */ Connect.prototype._nextTick = function() { var self = this; if(this._worked) { return; } while(this._isRun && this._arrayQuery.length>0) { this._worked = true; var el = this._arrayQuery.shift(); // pg, this._client.query(el.query, el.params, function(err, result) { self._queryCount--; if(err) { return el.callback(err); } el.callback(null, result); if(self._queryCount==0) { self._emitter.emit('drain'); self._setMax = false; } }) } this._worked = false; };
接続を管理するBalancerクラスを直接:新しい接続を開き、冗長な接続を閉じ、接続をリクエストを分配し、サービスに単一の入力を提供します
クラスバランサー
/* , */ function Balancer(minCountConnect, maxCountConnect) { // this._maxCountConnect = maxCountConnect; // this._minCountConnect = minCountConnect; // this._connectArray = []; // this._closedConnect = []; // this._taskArray = []; // this._run = false; // this._emitter = new (require('events').EventEmitter); // this._init(); } /* , , */ Balancer.prototype._init = function() { this._cursor = 0; this.activQuery = 0; var self = this; var i=0; // , var cycle = function() { i++; if(i<self._minCountConnect) { self._addNewConnect(cycle); } else { self._emitter.emit('ready'); } }; this._addNewConnect(cycle); }; /* , , */ Balancer.prototype._addNewConnect = function(cb) { var self = this; var connect = new GPSconnect(connString); connect.on('open', function() { self._connectArray.push(connect); cb(); }); }; /* , "" */ Balancer.prototype._cycle = function(pos) { for (var i=pos;i<this._connectArray.length;i++) { if( !(this._connectArray[i].isFull()) ) break; } return i; }; /* , */ Balancer.prototype._next = function(connect, el) { connect.addQuery(el.query, el.params, el.cb); connect.start(); this._distribution(); }; /* - . "Round-robin" . , "", , */ Balancer.prototype._distribution = function() { if(this._taskArray.length>0) { var el = this._taskArray.shift(); this._cursor = this._cycle(this._cursor); var self = this; if(this._cursor<this._connectArray.length) { var connect = this._connectArray[this._cursor]; this._next(connect, el); this._cursor++; } else { this._cursor = this._cycle(0); if(this._cursor<this._connectArray.length) { var connect = this._connectArray[this._cursor]; this._next(connect, el); this._cursor++; } else if( this._connectArray.length<this._maxCountConnect) { self._addNewConnect(function() { self._cursor = self._connectArray.length-1; var connect = self._connectArray[self._cursor]; self._next(connect, el); }); } else { for (var i=0;i<this._connectArray.length;i++) { if( this.activQuery/this._connectArray.length > this._connectArray[i].queryCount() ) { break; } } if(i==this._connectArray.length) { i = 0; } this._cursor = i; var connect = this._connectArray[this._cursor]; this._next(connect, el); } } } else { this._run = false; } }; /* , "" */ Balancer.prototype.on = function(typeEvent, func) { this._emitter.addListener(typeEvent, func); }; /* , , "" */ Balancer.prototype._removeLoad = function() { var self = this; var temp = this._connectArray[0].maxQueryCount().toFixed(); var currentCount = (this.activQuery/temp < this._minCountConnect) ? this._minCountConnect : temp; if(currentCount< this._connectArray.length ) { while( this._connectArray.length != currentCount ) { var poppedConnect = this._connectArray.pop(); if(poppedConnect.queryCount()==0) { poppedConnect.close(); } else { poppedConnect.index = self._closedConnect.length; poppedConnect.on('drain', function() { poppedConnect.close(); self._closedConnect.slice(poppedConnect.index, 1); }); self._closedConnect.push(poppedConnect); } } } }; /* C , -, . tube, , . */ Balancer.prototype.addQuery = function(tube, query, params, cb) { this.activQuery++; var self = this; this._removeLoad(); var wrappCb = function() { self.activQuery--; cb.apply(this, arguments); }; this._taskArray.push({ query: query, params: params, cb: wrappCb }); if(!this._run) { this._run = true; this._distribution(); } };
これをすべて確認する方法は? テストでは、クエリ「select pg_sleep(1)」を使用します。これは1秒間実行され、データベースへのクエリをシミュレートします。
これらのリクエストのうち10,000件は、バランサによって処理され、最大101590ミリ秒で、接続リクエストの最大数は100で、チャネルの合計数の境界は10〜100のソケットです。
使用されたスクリプト:
var balancer = new Balancer(10,100); balancer.on('ready', function() { var y=0; var time = +new Date(); for(var i=0;i<10000; i++) { balancer.addQuery('gps', 'select pg_sleep(1)', [], function(err, result) { if(err) console.log(err); y++; if(y==10000) { console.log(balancer._connectArray.length); console.log(+new Date()-time); } }); } });
すべてのソースはgithubで入手できます。
クライアントは未だに未加工です。もちろん、多くの作業を完了/書き換える必要がありますので、あまりscらないでください。 必要に応じて、私はより緊密に行うことができます。