以下に説明する例では、 Socks 4プロキシサーバーを実装する非常に現実的なタスクの例を使用して、ノンブロッキングソケットを操作する基本原理を実証しようとします。
人生の間、非ブロッキングソケットでは何でも起こります。つまり、
ServerSocketChannel
- OP_ACCEPT-着信接続
ソケットチャネル
- OP_READ-ニップルでのデータまたは切断
- OP_WRITE-記録または切断の準備ができたニップル
- OP_CONNECT-接続または確立済みかどうか
メソッドのいずれかを使用して何かが発生したソケットが選択されます
- select() -ブロッキングメソッド、イベントまたはwakeUp()で起動します
- select(long) -タイムアウトのみの同じトピック
- selectNow() -まあ、ノンブロッキングオプション
私たちの場合、プロキシは受動的なものなので、基本的なブロック選択()の方が適しています。
その後、最後の選択でアクティブになったキーをセレクターに要求し、メソッドisAcceptable() 、 isReadable() 、 isWriteable() 、 isConnectable()を使用して、何が起こったのかを調べる必要があります。
プロキシサーバーの基本的なアルゴリズムは次のとおりです。
- 接続を受け入れる
- ヘッダーの解析(この手順を簡素化するために、ヘッダーサイズは常にバッファーサイズよりも小さいと想定しています)
- 目標との関係を確立します
- 私たちはクライアントにすべてがOKであると答えます
- プロキシ
- 接続を閉じる
完全なソケットバッファに関する問題を回避するために、次のようにプロキシします。
A.in = B.outの2つの端AとBを持ち、A.interestOps()| OP_READ!= B.interestOps()| OP_WRITE(1つのバッファーが2つのチャネルで同時に使用されないようにする)
パーティの1つが接続を閉じた後、データをバッファから2番目のサイドに追加して、接続を閉じる必要があります。
さて、実際にはコード自体、アルゴリズムの理解を簡単にするためにアクションの順序で並べようとした関数、コメントが添付されています。
パッケージ ru.habrahabr ;
import java.io.IOException ;
import java.net.InetAddress ;
import java.net.InetSocketAddress ;
import java.net.UnknownHostException ;
import java.nio.ByteBuffer ;
import java.nio.channels.ClosedChannelException ;
import java.nio.channels.SelectionKey ;
import java.nio.channels.Selector ;
import java.nio.channels.ServerSocketChannel ;
import java.nio.channels.SocketChannel ;
import java.nio.channels.spi.SelectorProvider ;
import java.util.Iterator ;
/ **
*シンプルなノンブロッキングSocks 4 Proxy Serverを実装するクラス
*接続コマンドのみ
*
* @author dgreen
* @date 09/19/2009
*
* /
パブリック クラス Socks4Proxy は Runnable {
int bufferSize = 8192 ;
/ **
*ポート
* /
intポート;
/ **
*ホスト
* /
文字列ホスト;
/ **
*各キーにしがみついている追加情報{@link SelectionKey}
*
* @author dgreen
* @date 09/19/2009
*
* /
静的 クラス Attachment {
/ **
*読み取り用のバッファ。プロキシするときは、バッファになります
*ピアに保存されているキーのエントリ
*
*重要:Socks4ヘッダーを解析するとき、サイズは
* Mozillaブラウザーの通常のヘッダーのサイズより大きいバッファー
* Firefox、ヘッダーサイズは12バイト1バージョン+ 1コマンド+ 2ポート+
* 4 ip + 3 id(MOZ)+ 1 \ 0
* /
ByteBuffer in ;
/ **
*書き込み時のバッファ、プロキシの時点では、読み取り用のバッファと同じ
*ピアに保存されたキー
* /
ByteBuffer out ;
/ **
*プロキシする場所
* /
SelectionKeyピア;
}
/ **
*答えはOKに見えるか、サービスが提供されています
* /
static final byte [ ] OK = new byte [ ] { 0x00、0x5a、0x00、0x00、0x00、0x00、0x00、0x00 } ;
/ **
*ノンブロッキングサーバーの心臓部であり、実際にはアプリケーションごとに変わりません
*アプリケーションで、非ブロッキングサーバーを使用しない限り
*マルチスレッドアプリケーション、および他のスレッドからのキーの操作
*いくつかのKeyChangeRequestが追加されますが、このアプリケーションではこれはありません
*ニーズ
* /
@ オーバーライド
public void run ( ) {
{
//セレクターを作成します
Selector selector = SelectorProvider 。 プロバイダー ( ) 。 openSelector ( ) ;
//サーバーチャネルを開きます
ServerSocketChannel serverChannel = ServerSocketChannel 。 オープン ( ) ;
//ロックを解除します
serverChannel。 configureBlocking ( false ) ;
//ポートをハングアップします
serverChannel。 ソケット ( ) 。 bind ( new InetSocketAddress ( host、port ) ) ;
//セレクターへの登録
serverChannel。 register ( selector、serverChannel。validOps ( ) ) ;
//ノンブロッキングサーバーのメインループ
//このループは、ほとんどすべての非ブロッキングで同じです
//サーバー
while ( selector。select ( ) > - 1 ) {
//その時点でイベントが発生したキーを取得します
//最後の選択
イテレーター< SelectionKey >イテレーター=セレクター。 selectedKeys ( ) 。 イテレータ ( ) ;
while (イテレータ。hasNext ( ) ) {
SelectionKeyキー=イテレーター。 次へ ( ) ;
イテレータ。 削除 ( ) ;
if ( key。isValid ( ) ) {
//考えられるすべてのキーイベントを処理します
{
if ( key。isAcceptable ( ) ) {
//接続を受け入れます
受け入れる(キー) ;
} else if ( key。isConnectable ( ) ) {
//接続を設定します
接続(キー) ;
} else if ( key。isReadable ( ) ) {
//データを読み取ります
読み取り(キー) ;
} else if ( key。isWritable ( ) ) {
//データを書き込みます
書き込み(キー) ;
}
} catch ( 例外 e ) {
e。 printStackTrace ( ) ;
閉じる(キー) ;
}
}
}
}
} catch ( 例外 e ) {
e。 printStackTrace ( ) ;
新しい IllegalStateException ( e )を スローし ます。
}
}
/ **
*関数は接続を受け入れ、目的のアクションでキーを登録します
*データの読み取り(OP_READ)
*
* @paramキー
*イベントが発生したキー
* @throws IOException
* @throws ClosedChannelException
* /
private void accept ( SelectionKey key ) throws IOException 、 ClosedChannelException {
//受け入れられます
SocketChannel newChannel = ( ( ServerSocketChannel ) key。Channel ( ) ) 。 受け入れる ( ) ;
//ノンブロッキング
newChannel。 configureBlocking ( false ) ;
//セレクターに登録します
newChannel。 register ( key。selector ( ) 、 SelectionKey。OP_READ ) ;
}
/ **
*現在利用可能なデータを読み取ります。 関数は2つの状態にあります-
*リクエストヘッダーとダイレクトプロキシの読み取り
*
* @paramキー
*イベントが発生したキー
* @throws IOException
* @throws UnknownHostException
* @throws ClosedChannelException
* /
private void read ( SelectionKey key ) throws IOException 、 UnknownHostException 、 ClosedChannelException {
SocketChannel channel = ( ( SocketChannel ) key。Channel ( ) ) ;
Attachment attachment = ( ( Attachment ) key。Attachment ( ) ) ;
if ( attachment == null ) {
//バッファを遅延初期化
キー。 attach ( attachment = new Attachment ( ) ) ;
添付ファイル。 in = ByteBuffer 。 割り当てる ( bufferSize ) ;
}
if ( channel。read ( attachment。in ) < 1 ) {
// -1-ギャップ0-バッファにスペースがありません。これは、
//ヘッダーがバッファサイズを超えました
閉じる(キー) ;
} else if ( attachment。peer == null ) {
//セカンドエンドがない場合:)
readHeader ( key、attachment ) ;
} else {
//まあ、プロキシする場合は、2番目の端に関心を追加します
//書き込み
添付ファイル。 ピア 。 interestOps ( attachment。peer。interestOps ( ) | SelectionKey。OP_WRITE ) ;
//そして、最初に読むものへの関心を取り除きます。彼らはまだ書いていないからです
//現在のデータ、何も読み込まない
キー。 interestOps ( key。interestOps ( ) ^ SelectionKey。OP_READ ) ;
//書き込み用のバッファを準備します
添付ファイル。 で 。 フリップ ( ) ;
}
}
private void readHeader ( SelectionKey key、Attachment attachment ) throws IllegalStateException 、 IOException 、
UnknownHostException 、 ClosedChannelException {
バイト [ ] ar =添付ファイル。 で 。 配列 ( ) ;
if ( ar [ attachment。in。position ( ) - 1 ] == 0 ) {
//最後のバイト\ 0がユーザーIDの最後である場合。
if ( ar [ 0 ] ! = 4 && ar [ 1 ] ! = 1 || attachment。in。position ( ) < 8 ) {
//プロトコルのバージョンと有効性の簡単なチェック
//コマンド
// conectのみをサポートします
throw IllegalStateException ( "Bad Request" ) ;
} else {
//接続を作成します
SocketChannel peer = SocketChannel 。 オープン ( ) ;
ピア。 configureBlocking ( false ) ;
//パケットからアドレスとポートを取得します
byte [ ] addr = new byte [ ] { ar [ 4 ] 、ar [ 5 ] 、ar [ 6 ] 、ar [ 7 ] } ;
int p = ( ( ( 0xFF & ar [ 2 ] ) << 8 ) + ( 0xFF & ar [ 3 ] ) ) ;
//接続の確立を開始します
ピア。 connect ( 新しい InetSocketAddress ( InetAddress.getByAddress ( addr ) 、p ) ) ;
//セレクターへの登録
SelectionKey peerKey =ピア。 register ( key。selector ( ) 、 SelectionKey。OP_CONNECT ) ;
//要求している接続をミュートします
キー。 interestOps ( 0 ) ;
//鍵交換:)
添付ファイル。 peer = peerKey ;
添付ファイルpeerAttachemtn = new Attachment ( ) ;
peerAttachemtn。 peer = key ;
peerKey。 添付 ( peerAttachemtn ) ;
//ヘッダーバッファーをクリアします
添付ファイル。 で 。 クリア ( ) ;
}
}
}
/ **
*バッファからデータを書き込む
*
* @paramキー
* @throws IOException
* /
private void write ( SelectionKey key ) throws IOException {
//すべてのデータを書き込むことによってのみソケットを閉じます
SocketChannel channel = ( ( SocketChannel ) key。Channel ( ) ) ;
Attachment attachment = ( ( Attachment ) key。Attachment ( ) ) ;
if ( channel。write ( attachment。out ) ==- 1 ) {
閉じる(キー) ;
} else if ( attachment。out。remaining ( ) == 0 ) {
if ( attachment。peer == null ) {
//バッファにあったものを追加して閉じます
閉じる(キー) ;
} else {
//すべてが書き込まれている場合、バッファをクリアします
添付ファイル。 アウト 。 クリア ( ) ;
//読書の興味を2番目の端に追加
添付ファイル。 ピア 。 interestOps ( attachment.peer。interestOps ( ) | SelectionKey.OP_READ ) ;
//そして、レコードへの関心を取り除きます
キー。 interestOps ( key。interestOps ( ) ^ SelectionKey。OP_WRITE ) ;
}
}
}
/ **
*接続を終了する
*
* @paramキー
*イベントが発生したキー
* @throws IOException
* /
private void connect ( SelectionKey key ) throws IOException {
SocketChannel channel = ( ( SocketChannel ) key。Channel ( ) ) ;
Attachment attachment = ( ( Attachment ) key。Attachment ( ) ) ;
//接続を終了します
チャンネル。 finishConnect ( ) ;
//バッファを作成してOKと答えます
添付ファイル。 in = ByteBuffer 。 割り当てる ( bufferSize ) ;
添付ファイル。 で 。 置く ( OK ) 。 フリップ ( ) ;
添付ファイル。 out = ( (添付ファイル) attachment。peer。attachment ( ) ) 。 で ;
( (添付ファイル)添付ファイル。 ピア 。 添付ファイル ( ) ) 。 out =添付ファイル。 で ;
//フラグの2番目の端を書き込みおよび読み取りに設定します
//彼女がOKと書くとすぐに、もう一方の端を読み取りに切り替え、それで終わりです
//幸せになる
添付ファイル。 ピア 。 interestOps ( SelectionKey。OP_WRITE | SelectionKey。OP_READ ) ;
キー。 interestOps ( 0 ) ;
}
/ **
*コメントなし
*
* @paramキー
* @throws IOException
* /
private void close ( SelectionKey key ) throws IOException {
キー。 キャンセル ( ) ;
キー。 チャンネル ( ) 。 閉じる ( ) ;
SelectionKey peerKey = ( (添付ファイル)キー。 添付ファイル ( ) ) 。 ピア ;
if ( peerKey ! = null ) {
( (添付ファイル) peerKey。Attachment ( ) ) 。 peer = null ;
if ( ( peerKey。interestOps ( ) & SelectionKey。OP_WRITE ) == 0 ) {
( (添付ファイル) peerKey。Attachment ( ) ) 。 アウト 。 フリップ ( ) ;
}
peerKey。 interestOps ( SelectionKey.OP_WRITE ) ;
}
}
public static void main ( String [ ] args ) {
Socks4Proxyサーバー= 新しい Socks4Proxy ( ) ;
サーバー。 host = "127.0.0.1" ;
サーバー。 ポート = 1080 ;
サーバー。 実行 ( ) ;
}
}
次に、お気に入りのブラウザを開き、socks 4プロキシを選択し、127.0.0.1:1080と入力してパフォーマンスを確認します。