GOでのダウンロードマネージャーの開発

GOのマルチスレッドダウンロードマネージャー。



http://loafter.github.io/godownloader/

https://github.com/Loafter/godownloader





エントリー



むかしむかし、1998年にインターネットにアクセスするために、父と仕事でモデムを使用しました。 彼は仕事の後の夕方にそれをつけました、そして、私は31.2 kbit / sという速い速度でインターネットの広大さを楽しむことができました。 当時、ヒステリックなブロガーはいませんでした。ページはメガバイト単位ではなく、ニュースサイトでは真実のみを語っていました。 当然、主な関心は資源でした。 写真、プログラム、車などのゲームへのあらゆる種類の追加。 覚えているように、IEを介したダウンロードは地獄でした。 500 kb以上のファイルをダウンロードすることは単に不可能でした。古代のロバはずっと頑固でした。



当時、Getright、Go!Zilla、Download Accelerator、そしてもちろんFlashGetなど、あらゆる種類のダウンロードマネージャーがいました。 当時、それらの90%が広告に悩まされていましたが、FlashGetが最高でした。 彼は、ダウンロードしたファイルを細かく分割する方法を知っていて、賢く働きました。 私の記憶では、バージョン1.7が最後でした。 私は当時このバージョンを使用していました。

15年が経ち、海外からvpn経由で大量のデータをダウンロードする必要がありました。

そして、15年で何が変わったのでしょうか?

なし。 前と同じように、最小限の変更ですべて同じマネージャーがいます。 flashgetでさえ、最新の3.xxとともにバージョン1.7を残しました。

wxfastダウンロードで50ギガバイトのファイルをダウンロードできなかった後、ダウンロードマネージャーを作成することにしました。これには、完了の程度を制御し、いつでも停止し、アプリケーションの起動間のダウンロードの状態を保存する機能を持つ多くのマルチスレッドタスクが含まれます。 これはすべて、GO言語にとって大きな課題です。



ラッパー



どこから始めますか? 最初に必要なことは、ダウンロードを停止して、ダウンロードの進行状況に関する情報をいつでも受信できるようにすることです。 GOには、プログラムで使用できる軽量スレッドがあります。 つまり、少なくとも1つのダウンロード用のストリームと、プロセス制御(停止、ダウンロードの開始、進行状況に関する情報の受信)用のストリームが少なくとも1つあります。 ダウンロードしたデータに関する情報を取得するプロセスに問題がない場合(ストリームを介して参照することで取得できます)、ダウンロードの停止が少し複雑になるプロセスでは、別のゴルーチンを停止または強制終了できません。 しかし、ストリームを終了する信号を彼女に送信できます。 実際にそうします。 一時停止の可能性がある任意の個別の作品を作成し、作品の状態に関する情報を受け取ることができるシンプルなラッパーを実装します。

構造をラップするには、次のインターフェイスをサポートする必要があります。



type DiscretWork interface { DoWork() (bool, error) GetProgress() interface{} BeforeRun() error AfterStop() error }
      
      





ラッパー自体:



 func (mw *MonitoredWorker) wgoroute() { log.Println("info: work start", mw.GetId()) mw.wgrun.Add(1) defer func() { log.Print("info: release work guid ", mw.GetId()) mw.wgrun.Done() }() for { select { case newState := <-mw.chsig: if newState == Stopped { mw.state = newState log.Println("info: work stopped") return } default: { isdone, err := mw.Itw.DoWork() if err != nil { log.Println("error: guid", mw.guid, " work failed", err) mw.state = Failed return } if isdone { mw.state = Completed log.Println("info: work done") return } } } } } func (mw *MonitoredWorker) Start() error { mw.lc.Lock() defer mw.lc.Unlock() if mw.state == Completed { return errors.New("error: try run completed job") } if mw.state == Running { return errors.New("error: try run runing job") } if err := mw.Itw.BeforeRun(); err != nil { mw.state = Failed return err } mw.chsig = make(chan int, 1) mw.state = Running go mw.wgoroute() return nil }
      
      





スレッドwgoroute()をループで開始した後、関数はDoWork()メソッドを呼び出すことにより、各反復をステップごとに実行します。作業の実行中にエラーが発生した場合、関数はループを終了してスレッドを終了します。 ループ内でも、チャネルからのサンプリングが実行されます。



 select { case newState := <-mw.chsig: if newState == Stopped { mw.state = newState log.Println("info: work stopped") return }
      
      





Stoppedメッセージが到着すると、アルゴリズムはストリームを終了し、適切な状態を設定します。

言語の組み込みテストツールを使用して、ラッパーをテストします。



 package dtest import ( "errors" "fmt" "godownloader/monitor" "log" "math/rand" "testing" "time" ) type TestWorkPool struct { From, id, To int32 } func (tw TestWorkPool) GetProgress() interface{} { return tw.From } func (tw *TestWorkPool) BeforeRun() error { log.Println("info: exec before run") return nil } func (tw *TestWorkPool) AfterStop() error { log.Println("info: after stop") return nil } func (tw *TestWorkPool) DoWork() (bool, error) { time.Sleep(time.Millisecond * 300) tw.From += 1 log.Print(tw.From) if tw.From == tw.To { fmt.Println("done") return true, nil } if tw.From > tw.To { return false, errors.New("tw.From > tw.To") } return false, nil } func TestWorkerPool(t *testing.T) { wp := monitor.WorkerPool{} for i := 0; i < 20; i++ { mw := &monitor.MonitoredWorker{Itw: &TestWorkPool{From: 0, To: 20, id: rand.Int31()}} wp.AppendWork(mw) } wp.StartAll() time.Sleep(time.Second) log.Println("------------------Work Started------------------") log.Println(wp.GetAllProgress()) log.Println("------------------Get All Progress--------------") time.Sleep(time.Second) wp.StopAll() log.Println("------------------Work Stop-------------------") time.Sleep(time.Second) wp.StartAll() time.Sleep(time.Second * 5) wp.StopAll() wp.StartAll() wp.StopAll() }
      
      





データの読み込み



タスクのワーキングラッパーを作成したら、メイン機能であるhttpを介したデータのダウンロードに進みます。 おそらく、httpプロトコルの主な問題は、データを単一のストリームにロードする際の速度が遅いことです。 そのため、インターネットが遅いときに、ダウンロードしたファイルをフラグメントに分割して複数のhttp接続にアップロードする方法を知っているダウンロードマネージャーが非常に多くいました。これにより、速度が向上しました。 当然、ダウンロードマネージャーも例外ではなく、複数のストリームでファイルを取得できる必要があります。 このスキームの通常の操作では、サーバーが再開をサポートする必要があります。



クライアント側では、ヘッダーにRangeフィールドがあるリクエストを生成する必要があります。 生の形式のリクエスト全体は次のようになります。



 GET /PinegrowLinux64.2.2.zip HTTP/1.1 Host: pinegrow.s3.amazonaws.com User-Agent: Go-http-client/1.1 Range: bytes=34010904-42513630
      
      





私の最初の実装は非常にゆっくりと働きました。 問題は、データの小さな部分ごとにリクエストが準備されたことです。 100 kbのブロックで1から2メガバイトのセグメントをダウンロードする必要がある場合。 つまり、ブロックごとに10個のクエリが順番に実行されました。 私はすぐに何かが間違っていることに気づきました。

Wiresharkプログラムでは、別のプログラム(ダウンロードマスター)によるダウンロードの実行方法を確認しました。 作業の正しいスキームは異なっていました。 10個のセグメントをダウンロードする必要がある場合、各セグメントに対して最初に10個のhttp-requestが準備され、ブロックへの分割は1つのhttp-response内のbodyブロックからの順次読み取りによって実現されました。



 func (pd *PartialDownloader) BeforeDownload() error { //create new req r, err := http.NewRequest("GET", pd.url, nil) if err != nil { return err } r.Header.Add("Range", "bytes="+strconv.FormatInt(pd.dp.Pos, 10)+"-"+strconv.FormatInt(pd.dp.To, 10)) f,_:=iotools.CreateSafeFile("test") r.Write(f) f.Close() resp, err := pd.client.Do(r) if err != nil { log.Printf("error: error download part file%v \n", err) return err } //check response if resp.StatusCode != 206 { log.Printf("error: file not found or moved status:", resp.StatusCode) return errors.New("error: file not found or moved") } pd.req = *resp return nil } …. func (pd *PartialDownloader) DownloadSergment() (bool, error) { //write flush data to disk buffer := make([]byte, FlushDiskSize, FlushDiskSize) count, err := pd.req.Body.Read(buffer) if (err != nil) && (err.Error() != "EOF") { pd.req.Body.Close() pd.file.Sync() return true, err } //log.Printf("returned from server %v bytes", count) if pd.dp.Pos+int64(count) > pd.dp.To { count = int(pd.dp.To - pd.dp.Pos) log.Printf("warning: server return to much for me i give only %v bytes", count) } realc, err := pd.file.WriteAt(buffer[:count], pd.dp.Pos) if err != nil { pd.file.Sync() pd.req.Body.Close() return true, err } pd.dp.Pos = pd.dp.Pos + int64(realc) pd.messureSpeed(realc) //log.Printf("writed %v pos %v to %v", realc, pd.dp.Pos, pd.dp.To) if pd.dp.Pos == pd.dp.To { //ok download part complete normal pd.file.Sync() pd.req.Body.Close() pd.dp.Speed = 0 log.Printf("info: download complete normal") return true, nil } //not full download next segment return false, nil }
      
      





メモの前の部分からDiscretWorkインターフェイスでブートローダークラスをラップしたら、その動作をテストすることができます。



 func TestDownload(t *testing.T) { dl, err := httpclient.CreateDownloader("http://pinegrow.s3.amazonaws.com/PinegrowLinux64.2.2.zip", "PinegrowLinux64.2.2.zip", 7) if err != nil { t.Error("failed: can't create downloader") } errs := dl.StartAll() if len(errs)>0 { t.Error("failed: can't start downloader") } …..wait for finish download }
      
      





インターフェース



かなり長い間、私はすべてのサービスを1つのスキームに従って行っています。 原則として、ユーザーがhttp要求を介してjsonサービスとやり取りするためのWebインターフェース。 このワークフローには、以下にリストされている従来のグラフィックインターフェイスに勝るいくつかの利点があります。





インターフェイスは500ミリ秒ごとに更新されます。 擬似ファイルlocalhost / progress.jsonは 、ダウンロードテーブルのデータソースとして使用されます。 ブラウザで開くと、動的に更新されるjsonデータが開きます。 テーブルのコンポーネントとして、jgridが使用されます。 そのシンプルさにより、コードは非常に小さなスペースを占有します。



画像





 function UpdateTable() { $("#jqGrid") .jqGrid({ url: 'http://localhost:9981/progress.json', mtype: "GET", ajaxSubgridOptions: { async: false }, styleUI: 'Bootstrap', datatype: "json", colModel: [{ label: '#', name: 'Id', key: true, width: 5 }, ….. { label: 'Speed', name: 'Speed', width: 15, formatter: FormatByte }, { label: 'Progress', name: 'Progress', formatter: FormatProgressBar }], viewrecords: true, rowNum: 20, pager: "#jqGridPager" }); }
      
      





サービスを終了して設定を保存する



このサービスには、もう1つの興味深い機能があります。 これがWebサービスの終了方法です。 実際のところ、http-serviceが起動されると、プログラムはstart関数でハングし、アプリケーションが完了するまでハングします。 しかし、Goには、オペレーティングシステムから送信されたシグナルをサブスクライブする機能があります。 したがって、killコマンドを使用して実行し、最終的なアクションを実行する場合でも、プロセスが終了する瞬間をキャプチャできます。 たとえば、これにより、設定と現在のダウンロードの進行状況が保持されます。



 c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) signal.Notify(c, syscall.SIGTERM) go func() { <-c func() { gdownsrv.StopAllTask() log.Println("info: save setting ", gdownsrv.SaveSettings(getSetPath())) }() os.Exit(1) }()
      
      





go httpサービスの標準実装には多くの拡張機能があり、サービスの完了後にアクションを実行できます。 私の意見では、上記の方法は最も単純で信頼性が高く、この方法はサービスを停止しても機能します。



原則として、これはおそらく読者に伝えたかったことのすべてです。



ダウンロードマネージャーが他のユーザーにどの程度関連しているかはわかりませんが、ダウンロードマネージャーの助けを借りて、既に仮想マシンのディストリビューションとイメージをダウンロードしています。

しかし、私は定期的にチェックサムをチェックします。



Mac、Windows、Linuxのリリースをダウンロード: http : //loafter.github.io/godownloader/



Git: https : //github.com/Loafter/godownloader



All Articles