node.jsでのデータベース接続のプーリング

この記事では、nodejsツールによって作成された2つの抽象化クラスについて説明します。これらのクラスは、オープンチャネル(tcp-socket)を介してリクエストを配信する機能を提供します。 同時に、システムの全体的な負荷が考慮され、十分なチャネルがない場合は、新しいチャネルが開かれ、リクエストの総数が減少すると「余分な」チャネルが閉じられます。



このクライアントを使用して、本質的にnet.Socketであるチャネル間で要求を分散できます。 これを行うには、チャネルの開閉方法を変更して、チャネルにリクエストを追加する必要があります。



説明する例では、 pgライブラリが使用されます。これは、データベースを備えたサーバーへのソケットを開くための機能を提供します。 同時に、ライブラリによって提供されるデフォルトの接続プール管理は、いかなる方法でも使用されません。



最初に、1つのエンティティ(接続)を管理するConnectクラスを検討します。



クラス接続
/*   ,      "pg://USER:PASSWORD@HOST:PORT/DATABASE" */ function Connect(connString) { //      this._connString = connString; //  ,     this._isRun = false; //      ,      "maxCount" this._maxQueryCount = 100; //  ,    _nextTick this._worked = false; //  ,    this._queryCount = 0; // ""  this._emitter = new (require('events').EventEmitter); //  "" var self = this; //      "open",      this._emitter.on('open', function() { self._arrayQuery = []; }); //      ,     ,    this._emitter.on('error', function(err) { throw err; }); //      ,    this._emitter.on('maxCount', function(message) { self._setMax = true; }); //        ,      , //      net.Socket pg.connect(this._connString, function(err, client, done) { if (err) { return self._emitter.emit('error', err); } //   ""    ,     self._client = client; // " "  self._done = done; //    (    ) self._emitter.emit('open'); }); } /* ,     ""    */ Connect.prototype.on = function(typeEvent, func) { if(typeEvent == 'error') { //          this._emitter.removeAllListeners('error'); } this._emitter.addListener(typeEvent, func); }; /* ,       */ Connect.prototype.start = function() { this._isRun = true; this._nextTick(); }; /* ,       */ Connect.prototype.stop = function() { this._isRun = false; }; /* ,    (  ) */ Connect.prototype.isFull = function() { return this._setMax; }; /* ,    (..     ,       ) */ Connect.prototype.close = function () { if(this._done) { this._emitter.emit('close'); this._done(); } else { this._emitter.emit('error', new Error('connect is not active')); } }; /* ,     */ Connect.prototype.queryQueue = function () { return this._arrayQuery; }; /*     -  .        ,  ,     */ Connect.prototype.addQuery = function (query, params, cb) { if(!(typeof query == 'string')) { return this._emitter.emit('error', new Error('not valid query')); } if( !(typeof params == "object") || !(params instanceof Array) ) { return this._emitter.emit('error', new Error('not valid argument')); } this._queryCount++; this._arrayQuery.push({ query: query, params: params, callback: cb }); if(this._queryCount > this._maxQueryCount) { this._emitter.emit('maxCount', 'in queue added too many requests, the waiting time increases'); } this._nextTick(); }; /*         */ Connect.prototype.maxQueryCount = function (count) { if(count) { this._maxQueryCount = count; } else { return this._maxQueryCount; } }; /*     */ Connect.prototype.queryCount = function () { return this._queryCount; }; /*   ,    ,     ,       ,             ( _arrayQuery)         */ Connect.prototype._nextTick = function() { var self = this; if(this._worked) { return; } while(this._isRun && this._arrayQuery.length>0) { this._worked = true; var el = this._arrayQuery.shift(); //     pg,     this._client.query(el.query, el.params, function(err, result) { self._queryCount--; if(err) { return el.callback(err); } el.callback(null, result); if(self._queryCount==0) { self._emitter.emit('drain'); self._setMax = false; } }) } this._worked = false; };
      
      











接続を管理するBalancerクラスを直接:新しい接続を開き、冗長な接続を閉じ、接続をリクエストを分配し、サービスに単一の入力を提供します



クラスバランサー
 /*  ,     */ function Balancer(minCountConnect, maxCountConnect) { //          this._maxCountConnect = maxCountConnect; //          this._minCountConnect = minCountConnect; //   this._connectArray = []; //   this._closedConnect = []; //   this._taskArray = []; //   this._run = false; //   this._emitter = new (require('events').EventEmitter); //   this._init(); } /*   ,   ,    */ Balancer.prototype._init = function() { this._cursor = 0; this.activQuery = 0; var self = this; var i=0; //   ,    var cycle = function() { i++; if(i<self._minCountConnect) { self._addNewConnect(cycle); } else { self._emitter.emit('ready'); } }; this._addNewConnect(cycle); }; /*  ,  ,    */ Balancer.prototype._addNewConnect = function(cb) { var self = this; var connect = new GPSconnect(connString); connect.on('open', function() { self._connectArray.push(connect); cb(); }); }; /* ,   ""      */ Balancer.prototype._cycle = function(pos) { for (var i=pos;i<this._connectArray.length;i++) { if( !(this._connectArray[i].isFull()) ) break; } return i; }; /* ,    */ Balancer.prototype._next = function(connect, el) { connect.addQuery(el.query, el.params, el.cb); connect.start(); this._distribution(); }; /*    -    .     "Round-robin"     .    ,      "",                ,      */ Balancer.prototype._distribution = function() { if(this._taskArray.length>0) { var el = this._taskArray.shift(); this._cursor = this._cycle(this._cursor); var self = this; if(this._cursor<this._connectArray.length) { var connect = this._connectArray[this._cursor]; this._next(connect, el); this._cursor++; } else { this._cursor = this._cycle(0); if(this._cursor<this._connectArray.length) { var connect = this._connectArray[this._cursor]; this._next(connect, el); this._cursor++; } else if( this._connectArray.length<this._maxCountConnect) { self._addNewConnect(function() { self._cursor = self._connectArray.length-1; var connect = self._connectArray[self._cursor]; self._next(connect, el); }); } else { for (var i=0;i<this._connectArray.length;i++) { if( this.activQuery/this._connectArray.length > this._connectArray[i].queryCount() ) { break; } } if(i==this._connectArray.length) { i = 0; } this._cursor = i; var connect = this._connectArray[this._cursor]; this._next(connect, el); } } } else { this._run = false; } }; /* ,     ""    */ Balancer.prototype.on = function(typeEvent, func) { this._emitter.addListener(typeEvent, func); }; /* ,      ,        ""      */ Balancer.prototype._removeLoad = function() { var self = this; var temp = this._connectArray[0].maxQueryCount().toFixed(); var currentCount = (this.activQuery/temp < this._minCountConnect) ? this._minCountConnect : temp; if(currentCount< this._connectArray.length ) { while( this._connectArray.length != currentCount ) { var poppedConnect = this._connectArray.pop(); if(poppedConnect.queryCount()==0) { poppedConnect.close(); } else { poppedConnect.index = self._closedConnect.length; poppedConnect.on('drain', function() { poppedConnect.close(); self._closedConnect.slice(poppedConnect.index, 1); }); self._closedConnect.push(poppedConnect); } } } }; /* C ,   -,     .  tube,       ,     . */ Balancer.prototype.addQuery = function(tube, query, params, cb) { this.activQuery++; var self = this; this._removeLoad(); var wrappCb = function() { self.activQuery--; cb.apply(this, arguments); }; this._taskArray.push({ query: query, params: params, cb: wrappCb }); if(!this._run) { this._run = true; this._distribution(); } };
      
      









これをすべて確認する方法は? テストでは、クエリ「select pg_sleep(1)」を使用します。これは1秒間実行され、データベースへのクエリをシミュレートします。



これらのリクエストのうち10,000件は、バランサによって処理され、最大101590ミリ秒で、接続リクエストの最大数は100で、チャネルの合計数の境界は10〜100のソケットです。



使用されたスクリプト:



 var balancer = new Balancer(10,100); balancer.on('ready', function() { var y=0; var time = +new Date(); for(var i=0;i<10000; i++) { balancer.addQuery('gps', 'select pg_sleep(1)', [], function(err, result) { if(err) console.log(err); y++; if(y==10000) { console.log(balancer._connectArray.length); console.log(+new Date()-time); } }); } });
      
      







すべてのソースはgithubで入手できます。

クライアントは未だに未加工です。もちろん、多くの作業を完了/書き換える必要がありますので、あまりscらないでください。 必要に応じて、私はより緊密に行うことができます。



All Articles