何と何が必要か
最初から必要なものを理解します。
-GET / POST / PUT / DELETEリクエストの送信
-URL列挙、およびPOST本体
-オープン接続の制御
-フロー制御
-テスト期間の表示
-1秒あたりの最大リクエスト数の制限
-HTTPサーバーのウォームアップ時の歪みを避けるために、統計から最初の数秒を除外する機能
計画
-接続プール
-シンプルなリクエスト/レスポンス
-統計
-利益
大声で考えます
接続を制御する必要があるため、標準のhttp.Clientは私たちには適しておらず(そのようなタスクには大きすぎます)、パフォーマンスが低下するために多くを把握しています。 要求を送信するための複数のワーカースレッドを意味するため、それらが相互に共有する接続のプールが必要です。 サーバーからワーカーへの応答を待つことは意味がありません。これで貴重な時間を失うだけです。 通過トラフィックを推定する方法は? 標準のhttp.Request、http.Resposeはそのような情報を提供しません。それらを使用しても機能しません。つまり、必要なすべてを提供する単純な要求/応答を実装する必要があります。 メモリはゴムではないため、生データを収集して最後に集約することはできません。 その場で彫像を置きます。
行こう
制限されたチャネルに基づいて接続プールを作成します。 オブジェクトの単純なプールのように見えます。彼らはオブジェクトをチャネルから取り出し、動作させ、元に戻します。
type Connection struct { conn net.Conn manager *ConnectionManager } type ConnectionManager struct { conns chan *Connection config *Config } func NewConnectionManager(config *Config) (result *ConnectionManager) { result = &ConnectionManager{config: config, conns: make(chan *Connection, config.Connections)} for i := 0; i < config.Connections; i++ { connection := &Connection{manager: result} if connection.Dial() != nil { ConnectionErrors++ } result.conns <- connection } return } func (this *ConnectionManager) Get() *Connection { return <-this.conns } func (this *Connection) Dial() error { if this.IsConnected() { this.Disconnect() } conn, err := net.Dial("tcp4", this.manager.config.Url.Host) if err == nil { this.conn = conn } return err } func (this *Connection) Disconnect() { this.conn.Close() this.conn = nil } func (this *Connection) IsConnected() bool { return this.conn != nil } func (this *Connection) Return() { this.manager.conns <- this }
ここでの要求/応答では、Goソースを読み、そこに実装されている方法を確認し、簡単な例えをすることができます。主な違いは、各要求/応答のトラフィック量を取得し、貴重な時間を節約できることです。
リクエスト
type Request struct { Method string URL *url.URL Header map[string][]string Body io.Reader ContentLength int64 Host string BufferSize int64 } func (req *Request) Write(w io.Writer) error { bw := &bytes.Buffer{} fmt.Fprintf(bw, "%s %s HTTP/1.1\r\n", valueOrDefault(req.Method, "GET"), req.URL.RequestURI()) fmt.Fprintf(bw, "Host: %s\r\n", req.Host) userAgent := "" if req.Header != nil { if ua := req.Header["User-Agent"]; len(ua) > 0 { userAgent = ua[0] } } if userAgent != "" { fmt.Fprintf(bw, "User-Agent: %s\r\n", userAgent) } if req.Method == "POST" || req.Method == "PUT" { fmt.Fprintf(bw, "Content-Length: %d\r\n", req.ContentLength) } if req.Header != nil { for key, values := range req.Header { if key == "User-Agent" || key == "Content-Length" || key == "Host" { continue } for _, value := range values { fmt.Fprintf(bw, "%s: %s\r\n", key, value) } } } io.WriteString(bw, "\r\n") if req.Method == "POST" || req.Method == "PUT" { bodyReader := bufio.NewReader(req.Body) _, err := bodyReader.WriteTo(bw) if err != nil { return err } } req.BufferSize = int64(bw.Len()) _, err := bw.WriteTo(w) return err }
応答
type Response struct { Status string StatusCode int Header map[string][]string ContentLength int64 BufferSize int64 } func ReadResponse(r *bufio.Reader) (*Response, error) { tp := textproto.NewReader(r) resp := &Response{} line, err := tp.ReadLine() if err != nil { return nil, err } f := strings.SplitN(line, " ", 3) resp.BufferSize += int64(len(f) + 2) if len(f) < 2 { return nil, errors.New("Response Header ERROR") } reasonPhrase := "" if len(f) > 2 { reasonPhrase = f[2] } resp.Status = f[1] + " " + reasonPhrase resp.StatusCode, err = strconv.Atoi(f[1]) if err != nil { return nil, errors.New("malformed HTTP status code") } resp.Header = make(map[string][]string) for { line, err := tp.ReadLine() if err != nil { return nil, errors.New("Response Header ERROR") } resp.BufferSize += int64(len(line) + 2) if len(line) == 0 { break } else { f := strings.SplitN(line, ":", 2) resp.Header[f[0]] = append(resp.Header[strings.TrimSpace(f[0])], strings.TrimSpace(f[1])) } } if cl := resp.Header["Content-Length"]; len(cl) > 0 { i, err := strconv.ParseInt(cl[0], 10, 0) if err == nil { resp.ContentLength = i } } buff := make([]byte, resp.ContentLength) r.Read(buff) resp.BufferSize += int64(resp.ContentLength) return resp, nil }
テスト時間が終了したときにスレッドをオフにするために、フローの操作を終了するチャネルと、各スレッドが作業を正しく完了したことを報告するチャネルを作成します
WorkerQuit := make(chan bool, *_threads) WorkerQuited := make(chan bool, *_threads)
また、アプリケーションがいつでもテストを完了することができるように、Ctr + C(SIGTERM)も待機します
//Start Ctr+C listen signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM) //Wait timers or SIGTERM select { case <-time.After(config.Duration): case <-signalChan: } for i := 0; i < config.Threads; i++ { config.WorkerQuit <- true } //Wait for threads complete for i := 0; i < config.Threads; i++ { <-config.WorkerQuited }
ワーカー自身を見てみましょう:1秒あたりのリクエスト数を制限するために、合計数の1秒あたり4回を共有するたびに、カウンターを増やして、接続が解放されるか、作業が完了するのを待ちます
func NewThread(config *Config) { timerAllow := time.NewTicker(time.Duration(250) * time.Millisecond) allow := int32(config.MRQ / 4 / config.Threads) if config.MRQ == -1 { allow = 2147483647 } else if allow <= 0 { allow = 1 } var connectionErrors int32 = 0 currentAllow := allow for { select { // case <-timerAllow.C: currentAllow = allow // case connection := <-config.ConnectionManager.conns: currentAllow-- // - if currentAllow < 0 { connection.Return() } else { // req := getRequest(config.Method, config.Url, config.Source.GetNext()) // if config.Reconnect && connection.IsConnected() { connection.Disconnect() } // , if !connection.IsConnected() { if connection.Dial() != nil { connectionErrors++ } } // , if connection.IsConnected() { go writeSocket(connection, req, config.RequestStats) } else { connection.Return() } } // case <-config.WorkerQuit: // atomic.AddInt32(&ConnectionErrors, connectionErrors) // config.WorkerQuited <- true return } } }
接続が解放されるとすぐに、次のリクエストを作成し、非同期で送信を開始します。そのため、タイムアウトになるまで円で囲みます。 要求が送信され、応答が読み取られた後、接続がプールに返され、スレッドは再びそれを取得します。
提出依頼
func writeSocket(connection *Connection, req *http.Request, read chan *RequestStats) { result := &RequestStats{} // defer func() { connection.Return() read <- result }() now := time.Now() conn := connection.conn bw := bufio.NewWriter(conn) // err := req.Write(bw) if err != nil { result.WriteError = err return } err = bw.Flush() if err != nil { result.WriteError = err return } // res, err := http.ReadResponse(bufio.NewReader(conn)) if err != nil { result.ReadError = err return } // result.Duration = time.Now().Sub(now) result.NetOut = req.BufferSize result.NetIn = res.BufferSize result.ResponseCode = res.StatusCode req.Body = nil }
残っているのは小さく、RequestStatsオブジェクトから統計を収集して整理するためです。
// type StatsSource struct { Readed int64 Writed int64 Requests int Skiped int Min time.Duration Max time.Duration Sum int64 Codes map[int]int DurationPercent map[time.Duration]int ReadErrors int WriteErrors int Work time.Duration } // type StatsSourcePerSecond struct { Readed int64 Writed int64 Requests int Skiped int Sum int64 } // func StartStatsAggregator(config *Config) { allowStore := true allowStoreTime := time.After(config.ExcludeSeconds) if config.ExcludeSeconds.Seconds() > 0 { allowStore = false } verboseTimer := time.NewTicker(time.Duration(1) * time.Second) if config.Verbose { fmt.Printf("%s %s %s %s %s %s\n", newSpancesFormatRightf("Second", 10, "%s"), newSpancesFormatRightf("Total", 10, "%s"), newSpancesFormatRightf("Req/sec", 10, "%s"), newSpancesFormatRightf("Avg/sec", 10, "%s"), newSpancesFormatRightf("In/sec", 10, "%s"), newSpancesFormatRightf("Out/sec", 10, "%s"), ) } else { verboseTimer.Stop() } source = StatsSource{ Codes: make(map[int]int), DurationPercent: make(map[time.Duration]int), } perSecond := StatsSourcePerSecond{} start := time.Now() for { select { // case <-verboseTimer.C: if perSecond.Requests-perSecond.Skiped > 0 && config.Verbose { // avgMilliseconds := perSecond.Sum / int64(perSecond.Requests-perSecond.Skiped) avg := time.Duration(avgMilliseconds) * time.Millisecond // fmt.Printf("%s %s %s %s %s %s\n", newSpancesFormatRightf(roundToSecondDuration(time.Now().Sub(start)), 10, "%v"), newSpancesFormatRightf(source.Requests, 10, "%d"), newSpancesFormatRightf(perSecond.Requests, 10, "%d"), newSpancesFormatRightf(avg, 10, "%v"), newSpancesFormatRightf(Bites(perSecond.Readed), 10, "%s"), newSpancesFormatRightf(Bites(perSecond.Writed), 10, "%s"), ) } // perSecond = StatsSourcePerSecond{} // case <-allowStoreTime: allowStore = true // case res := <-config.RequestStats: // - , if res.ReadError != nil { source.ReadErrors++ continue } else if res.WriteError != nil { source.WriteErrors++ continue } // source.Requests++ perSecond.Requests++ perSecond.Readed += res.NetIn perSecond.Writed += res.NetOut source.Readed += res.NetIn source.Writed += res.NetOut // HTTP source.Codes[res.ResponseCode]++ if !allowStore { perSecond.Skiped++ source.Skiped++ continue } // sum := int64(res.Duration.Seconds() * 1000) source.Sum += sum perSecond.Sum += sum // if source.Min > res.Duration { source.Min = roundDuration(res.Duration) } if source.Max < res.Duration { source.Max = roundDuration(res.Duration) } // 10 duration := time.Duration(res.Duration.Nanoseconds()/10000000) * time.Millisecond * 10 source.DurationPercent[duration]++ // case <-config.StatsQuit: // source.Work = time.Duration(time.Now().Sub(start).Seconds()*1000) * time.Millisecond if config.Verbose { s := "" for { if len(s) >= 61 { break } s += "-" } fmt.Println(s) } // config.StatsQuit <- true return } } }
まとめると
開始引数を解析し、統計出力をフォーマットする方法はおもしろくないので省略します。 それでは、取得したものを確認しましょう。 サンプルでは、Node.jsクラスターにwrkを設定します
% ./wrk -c 21 -t 7 -d 30s -L http://localhost:3001/index.html Running 30s test @ http://localhost:3001/index.html 7 threads and 21 connections Thread Stats Avg Stdev Max +/- Stdev Latency 1.09ms 6.55ms 152.07ms 99.63% Req/Sec 5.20k 3.08k 14.33k 58.75% Latency Distribution 50% 490.00us 75% 0.89ms 90% 1.83ms 99% 5.04ms 1031636 requests in 30.00s, 153.48MB read Requests/sec: 34388.25 Transfer/sec: 5.12MB
GOMAXPROCS = 1の場合も同様です
% ./go-meter -t 7 -c 21 -d 30s -u http://localhost:3001/index.html Running test threads: 7, connections: 21 in 30s GET http://localhost:3001/index.html Stats: Min Avg Max Latency 0 0 83ms 843183 requests in 30s, net: in 103MB, out 62MB HTTP Codes: 200 100.00% Latency: 0 99.99% 10ms - 80ms 0.01% Requests: 28106.10/sec Net In: 27MBit/sec Net Out: 17MBit/sec Transfer: 5.5MB/sec
1秒あたり34388要求に対して28106を取得します-これは、純粋なC +イベントループ+ nioと比較して約20%少ないです。 GOMAXPROCSを変更する場合、ほとんどのプロセッサー時間はNode.jsによって費やされるため、実質的に違いはありません。
短所:
-20%のパフォーマンスの損失、要求/応答を簡素化することを試みることができます、少しパフォーマンスを与えるかもしれません
-HTTPSはまだサポートされていません
-カスタムHTTPヘッダーとタイムアウトをまだ指定できません
すべてのソースはこちら-Github
使い方
% go get github.com/a696385/go-meter % $GOPATH/bin/go-meter -h
ご清聴ありがとうございました!