まえがき
この記事は、フォーラムでの頻繁な質問と、CPythonでのマルチスレッドのトピックに関するicqでの質問の両方の後によく書き始めました。 それらを尋ねる人々の問題は、主にマルチスレッドアプリケーションの基本原則の無知または誤解から来ています。 少なくとも、これは、スレッドプールと呼ばれる、使用するマルチスレッドモデルに当てはまります。 別の一般的な問題は別の問題です。人々は標準のCPythonモジュールを扱う基本的なスキルを持っていません。 私の謙虚な意見では重要ではないので、この記事では、人格についてではなく、そのような無知の例を挙げようとします。 この記事が書かれている条件に基づいて、プロキシサーバーを介した作業に少し影響を与えます(SOCKSと混同しないでください)。
詳細
この記事は、CPythonの最新バージョンが2番目のブランチで2.6.2、3番目のブランチで3.1.1であったときに書かれました。 この記事では、CPython 2.6の新しいステートメントをステートメントを使用して使用します。したがって、このコードを以前のバージョンで使用する場合は、独自の判断でコードをやり直す必要があります。 この記事の執筆プロセスでは、「すぐに使用可能な」標準モジュールのみが使用されました。 また、私はプロのプログラマーではなく、独学であるという事実に基づいて、特定の概念の解釈に関する不正確な可能性について尊敬する聴衆に謝罪します。 したがって、可能な限り回答する質問をすることを勧めます。
だから、実際に始めましょう、私が初めて説明しようとしていたことは、尊敬されているlorienによってpython.suによって推奨されました(彼のキューの例では、通常、別のスレッドで処理されました:))それは彼からのものであり、スレッドプールではなくタスクプールである可能性がさらに高いです(ただし、この用語の解釈が間違っている可能性があります)。
マルチスレッドアプリケーションとは何ですか? これは、特定の数のスレッドが特定のタスクを実行するアプリケーションです。 多くの場合の問題は、メインストリームがアクティブである限り、フローが互いに別々に動作するという事実を完全に捕捉できないことです。 個人的には、気にしないように書きますが、後で詳しく説明します。 また、彼らの問題は、いわゆる「ヒンドゥー語」コードであり、これはどこからでも単純かつ思いやりなくコピーされ、プログラムは「ただ動作する」レベルになります。 紳士、一度だけ学ぶ:プログラムのこの部分またはその部分がどのように機能するかを理解していない場合は、それを書き直して、あなたが理解できるようにします。問題なく、このコードを使用できます。 主なものは、作成がどのように機能するかを理解することです。
スレッドの個別の作業の問題に触れます。 紳士、それはあなたが既にそれを書いたときではなく、あなたがアプリケーションを書き始める前にフローの相互作用を考慮する価値があります。 原則として、アプリケーションのソースコードを操作するためのいくつかのルールを順守している場合、シングルスレッドからマルチスレッドへのプログラムのリエンジニアリングは簡単、簡単、迅速です。
メインスレッドのアクティビティについて。 どうやら、1つのスレッドを開始すると、2つのスレッドが実際に機能します。 現在アクティブなスレッドの数は、あなたが現在実行しているスレッドの数と、アプリケーションの本体が動作するストリームを+1することを理解する必要があります。 個人的には、メインストリームと自分が立ち上げたストリームを明確に区別するように書きます。 これが行われないと、アプリケーションは(あなたのように)途中で終了する可能性がありますが、実際にはアプリケーションは作成したとおりに動作します。
言葉で明確に思えますが、今は練習を始めています。 実際には、CPythonはGIL(Global Interpreter Lock)のような理解を持っています。 simとは、アプリケーションのスレッドがプロセッサにアクセスしている瞬間のインタープリターのグローバルロックを意味します。 実際、1つの瞬間にプロセッサで動作するスレッドは1つだけです。 この点で、標準CPythonで一般的に起動できるスレッドの最大数は、約350個変動します。
例として、 マルチスレッドパーサーwww.google.comを実装しようとします。 上で書いたように、もっぱら標準モジュールが動作するために使用されます。モジュールurllib2、urllib、キュー、スレッド、タスクを完了するにはreが必要です。
順番に:
#==================< >==================
import urllib2
# HTTP,
import urllib
# HTTP, urllib2,
# - urllib.urlquote
from Queue import Queue
#, "Pool", ,
# ,
#
import threading
# ,
#threading.active_count, threading.Thread, threading.Thread.start,
#threading.Rlock
import re
# ,
#
import time
# , sleep
queue = Queue()
# , (..
# Queue Queue )
#==================</ >=================
#==============================<>==============================
PROXY = "10.10.31.103:3128"
# -,
# ,
# PROXY, -.
# None
HEADERS = { "User-Agent" : "Opera/9.64 (Windows NT 5.1; U; en) Presto/2.1.1" ,
"Accept" : "text/html, application/xml;q=0.9, application/xhtml+xml, image/ png, image/jpeg, image/gif, image/x-xbitmap, */*;q=0.1" ,
"Accept-Language" : "ru,uk-UA;q=0.9,uk;q=0.8,en;q=0.7" ,
"Accept-Charset" : "iso-8859-1, utf-8, utf-16, *;q=0.1" ,
"Accept-Encoding" : "identity, *;q=0" ,
"Connection" : "Keep-Alive" }
# www.google.com
# , HEADERS,
# Opera ,
# zlib compressed data, ..
# - ,
# ...
THREADS_COUNT = 10
# , -
DEEP = 30
# - , ,
# , ,
# .
ENCODING = "UTF-8"
# (
# )
#==============================</>===================================
LOCK = threading . RLock()
# threading
# LOCK, threading.RLock
# threading, - ,
#
#acquire() threading.RLock threading.Lock (
# threading) ,
#threading.RLock , threading.Lock
# .
#==================< >==================
import urllib2
# HTTP,
import urllib
# HTTP, urllib2,
# - urllib.urlquote
from Queue import Queue
#, "Pool", ,
# ,
#
import threading
# ,
#threading.active_count, threading.Thread, threading.Thread.start,
#threading.Rlock
import re
# ,
#
import time
# , sleep
queue = Queue()
# , (..
# Queue Queue )
#==================</ >=================
#==============================<>==============================
PROXY = "10.10.31.103:3128"
# -,
# ,
# PROXY, -.
# None
HEADERS = { "User-Agent" : "Opera/9.64 (Windows NT 5.1; U; en) Presto/2.1.1" ,
"Accept" : "text/html, application/xml;q=0.9, application/xhtml+xml, image/ png, image/jpeg, image/gif, image/x-xbitmap, */*;q=0.1" ,
"Accept-Language" : "ru,uk-UA;q=0.9,uk;q=0.8,en;q=0.7" ,
"Accept-Charset" : "iso-8859-1, utf-8, utf-16, *;q=0.1" ,
"Accept-Encoding" : "identity, *;q=0" ,
"Connection" : "Keep-Alive" }
# www.google.com
# , HEADERS,
# Opera ,
# zlib compressed data, ..
# - ,
# ...
THREADS_COUNT = 10
# , -
DEEP = 30
# - , ,
# , ,
# .
ENCODING = "UTF-8"
# (
# )
#==============================</>===================================
LOCK = threading . RLock()
# threading
# LOCK, threading.RLock
# threading, - ,
#
#acquire() threading.RLock threading.Lock (
# threading) ,
#threading.RLock , threading.Lock
# .
マルチスレッドのシームレスな実装の基本原則であるコードのモジュール性を考慮します。 ファイルまたはクラスを分離するために使用する関数を転送する必要はありません。少なくともこれらは別個の関数であれば十分です(しゃれはご容赦ください)。
次に、ストリームの作業を個別の関数に分離し、一種の抽象的な概念の作業にします。 この段階では、現時点では理解できない点がいくつかありますが、これらはすべて理解可能なアルゴリズムに組み込まれます。
def worker ():
# worker,
global queue
#
# ,
# (!)
while True :
# ,
try :
# , try/except,
# QueueEmpty , ,
#
target_link = queue . get_nowait()
#
# queue
except Exception , error:
#
return
#
parsed_data = get_and_parse_page(target_link)
# ,
#
if parsed_data != "ERROR" :
# ,
write_to_file(parsed_data)
#
else :
queue . put(target_link)
# , queue
def worker ():
# worker,
global queue
#
# ,
# (!)
while True :
# ,
try :
# , try/except,
# QueueEmpty , ,
#
target_link = queue . get_nowait()
#
# queue
except Exception , error:
#
return
#
parsed_data = get_and_parse_page(target_link)
# ,
#
if parsed_data != "ERROR" :
# ,
write_to_file(parsed_data)
#
else :
queue . put(target_link)
# , queue
明確に理解する必要がある主なことは、スレッド自体のアルゴリズムと、スレッドが互いに独立して正確に処理するものです。 合計で、ストリームのタスクは非常に単純です-検索ページへのリンクを取得し、ハンドラー関数に渡します。ハンドラー関数に、リンクとタイトルをファイルに書き込んだ後、見つかったサイトとこれらのサイトのタイトルへのリンクが返されます(これはすべてparsed_dataにあります)。
作業が進むにつれて、最も単純なものから最も複雑なものまでのタスクが実装されるため、マルチスレッドの問題はこれらの段階では実際には対処されません。 次に、ファイルへの書き込みを担当する関数を実装する番です。
def write_to_file (parsed_data):
# write_to_file, –
global LOCK
global ENCODING
LOCK . acquire()
#" ",
#
with open ( "parsed_data.txt" , "a" ) as out:
# with statement, parsed_data.txt
# "a", ,
# out ( )
for site in parsed_data:
# parsed data,
# site
link, title = site[ 0 ], site[ 1 ]
# link title site
title = title . replace( "<em>" , "" ) . replace( "</em>" , "" ) . replace( "<b>" , "" ) . replace( "</b>" , "" )
#.replace - HTML-, title
out . write( u"{link}|{title} \n " . format(link = link, title = title) . encode( "cp1251" ))
# ,
# .format, % ,
# , :
# | title \n - (
# cp1251)
LOCK . release()
#"" ,
# . -,
# , ,
# ( )
# “ ”
# parsed_data.txt
def write_to_file (parsed_data):
# write_to_file, –
global LOCK
global ENCODING
LOCK . acquire()
#" ",
#
with open ( "parsed_data.txt" , "a" ) as out:
# with statement, parsed_data.txt
# "a", ,
# out ( )
for site in parsed_data:
# parsed data,
# site
link, title = site[ 0 ], site[ 1 ]
# link title site
title = title . replace( "<em>" , "" ) . replace( "</em>" , "" ) . replace( "<b>" , "" ) . replace( "</b>" , "" )
#.replace - HTML-, title
out . write( u"{link}|{title} \n " . format(link = link, title = title) . encode( "cp1251" ))
# ,
# .format, % ,
# , :
# | title \n - (
# cp1251)
LOCK . release()
#"" ,
# . -,
# , ,
# ( )
# “ ”
# parsed_data.txt
以下は、get_and_parse_page関数の実装です。
def get_and_parse_page (target_link):
# , –
global PROXY
# , PROXY
#
global HEADERS
# Headers
if PROXY is not None :
# PROXY None
proxy_handler = urllib2 . ProxyHandler( { "http" : "" + PROXY + "/" } )
# -
opener = urllib2 . build_opener(proxy_handler)
# opener c -
urllib2 . install_opener(opener)
# - ,
#, urllib2
#(
#PROXY)
page_request = urllib2 . Request(url = target_link, headers = HEADERS)
# Request, Request instance,
# GET ,
# ...
try :
# ,
#, ,
page = urllib2 . urlopen(url = page_request) . read() . decode( "UTF-8" , "replace" )
# page ,
# unicode UTF-8 (, www.google.com) (
#Python 2.6 unicode - (!))
except Exception ,error:
# error
print str (error)
# ,
#( )
return "ERROR"
# ,
harvested_data = re . findall( r'''\<li\ class\=g\>\<h3\ class\=r\>\<a\ href\=\"(.*?)".*?>(.*?)\<\/a\>\<\/h3\>''' , page)
# title
# , .
for data in harvested_data:
# harvested_data data
if data[ 0 ] . startswith( "/" ):
# data() /
harvested_data . remove(data)
# harvested_data
if ".google.com" in data[ 0 ]:
# data() .google.com
harvested_data . remove(data)
# harvested_data
return harvested_data
#
def get_and_parse_page (target_link):
# , –
global PROXY
# , PROXY
#
global HEADERS
# Headers
if PROXY is not None :
# PROXY None
proxy_handler = urllib2 . ProxyHandler( { "http" : "" + PROXY + "/" } )
# -
opener = urllib2 . build_opener(proxy_handler)
# opener c -
urllib2 . install_opener(opener)
# - ,
#, urllib2
#(
#PROXY)
page_request = urllib2 . Request(url = target_link, headers = HEADERS)
# Request, Request instance,
# GET ,
# ...
try :
# ,
#, ,
page = urllib2 . urlopen(url = page_request) . read() . decode( "UTF-8" , "replace" )
# page ,
# unicode UTF-8 (, www.google.com) (
#Python 2.6 unicode - (!))
except Exception ,error:
# error
print str (error)
# ,
#( )
return "ERROR"
# ,
harvested_data = re . findall( r'''\<li\ class\=g\>\<h3\ class\=r\>\<a\ href\=\"(.*?)".*?>(.*?)\<\/a\>\<\/h3\>''' , page)
# title
# , .
for data in harvested_data:
# harvested_data data
if data[ 0 ] . startswith( "/" ):
# data() /
harvested_data . remove(data)
# harvested_data
if ".google.com" in data[ 0 ]:
# data() .google.com
harvested_data . remove(data)
# harvested_data
return harvested_data
#
def get_and_parse_page (target_link):
# , –
global PROXY
# , PROXY
#
global HEADERS
# Headers
if PROXY is not None :
# PROXY None
proxy_handler = urllib2 . ProxyHandler( { "http" : "" + PROXY + "/" } )
# -
opener = urllib2 . build_opener(proxy_handler)
# opener c -
urllib2 . install_opener(opener)
# - ,
#, urllib2
#(
#PROXY)
page_request = urllib2 . Request(url = target_link, headers = HEADERS)
# Request, Request instance,
# GET ,
# ...
try :
# ,
#, ,
page = urllib2 . urlopen(url = page_request) . read() . decode( "UTF-8" , "replace" )
# page ,
# unicode UTF-8 (, www.google.com) (
#Python 2.6 unicode - (!))
except Exception ,error:
# error
print str (error)
# ,
#( )
return "ERROR"
# ,
harvested_data = re . findall( r'''\<li\ class\=g\>\<h3\ class\=r\>\<a\ href\=\"(.*?)".*?>(.*?)\<\/a\>\<\/h3\>''' , page)
# title
# , .
for data in harvested_data:
# harvested_data data
if data[ 0 ] . startswith( "/" ):
# data() /
harvested_data . remove(data)
# harvested_data
if ".google.com" in data[ 0 ]:
# data() .google.com
harvested_data . remove(data)
# harvested_data
return harvested_data
#
最後に、アプリケーションの本体を実装する番です。
def main ():
# ,
print "STARTED"
#
global THREADS_COUNT
global DEEP
global ENCODING
#
#
with open ( "requests.txt" ) as requests:
# requests
for request in requests:
# ,
# , ,
# , :)
request = request . translate( None , " \r\n " ) . decode(ENCODING, "replace" )
#
# ( )
empty_link = "www.google.com/search?hl=ru&client=opera&rls=ru&hs=67v&q={request}&start={N}&sa=N"
# ,
for i in xrange ( 0 , DEEP, 10 ):
# # 0 DEEP,
#
# 10, ..
# , .. 10, 20, 30 ( )
queue . put(empty_link . format(request = request . encode( "UTF-8" ), N = i))
#
# UTF-8 ( )
for _ in xrange (THREADS_COUNT):
#
thread_ = threading . Thread(target = worker)
# , target- ,
# ,
thread_ . start()
# start() ,
while threading . active_count() >1 :
# , 1 (,
# )
time . sleep( 1 )
# 1
print "FINISHED"
#
def main ():
# ,
print "STARTED"
#
global THREADS_COUNT
global DEEP
global ENCODING
#
#
with open ( "requests.txt" ) as requests:
# requests
for request in requests:
# ,
# , ,
# , :)
request = request . translate( None , " \r\n " ) . decode(ENCODING, "replace" )
#
# ( )
empty_link = "www.google.com/search?hl=ru&client=opera&rls=ru&hs=67v&q={request}&start={N}&sa=N"
# ,
for i in xrange ( 0 , DEEP, 10 ):
# # 0 DEEP,
#
# 10, ..
# , .. 10, 20, 30 ( )
queue . put(empty_link . format(request = request . encode( "UTF-8" ), N = i))
#
# UTF-8 ( )
for _ in xrange (THREADS_COUNT):
#
thread_ = threading . Thread(target = worker)
# , target- ,
# ,
thread_ . start()
# start() ,
while threading . active_count() >1 :
# , 1 (,
# )
time . sleep( 1 )
# 1
print "FINISHED"
#
def main ():
# ,
print "STARTED"
#
global THREADS_COUNT
global DEEP
global ENCODING
#
#
with open ( "requests.txt" ) as requests:
# requests
for request in requests:
# ,
# , ,
# , :)
request = request . translate( None , " \r\n " ) . decode(ENCODING, "replace" )
#
# ( )
empty_link = "www.google.com/search?hl=ru&client=opera&rls=ru&hs=67v&q={request}&start={N}&sa=N"
# ,
for i in xrange ( 0 , DEEP, 10 ):
# # 0 DEEP,
#
# 10, ..
# , .. 10, 20, 30 ( )
queue . put(empty_link . format(request = request . encode( "UTF-8" ), N = i))
#
# UTF-8 ( )
for _ in xrange (THREADS_COUNT):
#
thread_ = threading . Thread(target = worker)
# , target- ,
# ,
thread_ . start()
# start() ,
while threading . active_count() >1 :
# , 1 (,
# )
time . sleep( 1 )
# 1
print "FINISHED"
#
その結果、通常動作するマルチスレッドパーサーが得られます。 当然多くの欠点がありますが、美しく書かれていますので、コメントして申し訳ありません。
コード:
この記事 + ソース : sendspace.com/file/mw0pac
ロシア語コメント付きのコード : dumpz.org/15202
ウクライナのコメント付きのコード : dumpz.org/15201
PSはい、キュー(hello、cr0w)を使用したこの例は非合理的であるように思えます。 しかし、エラー処理は、正確に使用するのが最も簡単です。
PPSマテリアルは、不可fall性を主張しません。 当然、ここでは100%のbydlockコードであり、私が説明していることを理解していない、用語との混同、bydlokodなどです。 など しかし、あなたはそれをやり過ぎることができないものがあります-それは動作し、それから期待どおりに動作し、コードは明確でコメントアウトされているので、赤ちゃんにとっても明確になります。少なくとも誰かに役立つことを願っています...
© login999
uasc.org.ua