問題の声明
もちろん、コードを書き始める前に、適切なタスクを設定する必要があります。プログラムの要件は次のとおりです。
- 計算は複数のマシンで並行して実行する必要があります(ローカルネットワーク内)
- 計算はマルチスレッドである必要があります
- 断続的に計算が行われるコンピューターへのアクセス
つまり、複数のコンピューター(見かけ上はワークステーション)で実行されるプログラムを作成し、積分をカウントする指示を待ってから、それを読み取り、カウントを求めた人に結果を送り返す必要があります。
ネットワークを扱う必要があるため、積分を計算するために確率的方法を選択します。 モンテカルロ法を使用します。 並列化は簡単で、2つの方法が明らかです。
- セグメントごと、つまり、各計算機は特定のセグメント上でnポイントを生成します
- 生成されたポイントの数、つまり、各計算機は、積分区間全体でn /(計算機の数)ポイントを生成します
特に動機付けせずに2番目の方法を選択します。セグメントの最初と最後を計算機に渡す必要はありません。
そのため、ジョブを待機し、結果を読み取って返すサーバー(計算機)が必要です。 また、ユーザーが対話するクライアントも必要です。
コンピューティング用のサーバーを見つける方法 サーバーのIPアドレスが動的であることを考慮すると、すべてのIPを暗記するか、どこかに書き留めることは最良の選択肢ではありません。 ソリューションは簡単です。ブロードキャスト要求を使用して、現在利用可能なサーバーを見つけます。
2番目の問題、情報交換に使用するトランスポートはどれですか? ブロードキャストリクエストの場合、明らかにUDPを使用します。 ただし、クライアントとサーバーの相互作用には、UDPとTCPの両方を使用できます。 ただし、TCPを使用すると、接続の状態を確認する必要がないため、問題が少なくなります。 一方、ソケットが閉じた場合、OSはこれを検出して通知します。
対話の最終バージョンは次のとおりです。サーバーはTCPソケットでの接続を待機し、同時にブロードキャスト要求に応答して、クライアントに存在を示します。 クライアントが接続してタスクを指定するとすぐに、ブロードキャスト要求への応答を一時停止し、計算し、クライアントに応答して、サイクルを再開します。 クライアント:リクエストを送信し、サーバーのリストを作成し、サーバー間でタスクを共有し、結果を受信してユーザーに表示します。
サーバー
最初にサーバーを書きましょう。 まず、クライアントからのリクエストを待機するポートについて合意します。ポート番号38199とします。次に、クライアントにサーバーにジョブを送信するための構造をアナウンスします。
#define RCVPORT 38199
#define FUNC(x) x*x // x^2
//
typedef struct {
int limits;
int numoftry;
} task_data_t;
* This source code was highlighted with Source Code Highlighter .
上記のコードからわかるように、クライアントはサーバーに統合の上限とサーバーが行う必要のある試行回数を送信します。
複数のスレッドで計算を行うことは明らかなので、計算機スレッドの引数構造と関数を作成します。
//
typedef struct {
int limits; //
long numoftry; //
long double *results; //
} thread_args_t;
//
void *calculate( void *arg) {
// thread_args_t
thread_args_t *tinfo = (thread_args_t*) arg;
long double result = 0;
int xlim = tinfo->limits;
int trys = tinfo->numoftry;
unsigned a = xlim;
for ( int i = 0; i < trys; ++i) {
int div = rand_r(&a);
int div2 = rand_r(&a);
double x = div % xlim + (div2/(div2*1.0 + div*1.0)) ;
result += FUNC(x);
}
*(tinfo->results) = result;
return NULL;
}
* This source code was highlighted with Source Code Highlighter .
rand_r関数(unsigned int * seedp)が使用されていることに注意してください 。 この関数はローカル変数を使用して呼び出し間の中間値を格納するためです。 rand()関数を使用する場合のように、すべてのスレッドにグローバル変数の使用を許可することはできません。これは相互のブロッキングを引き起こすためです。
すべての計算スレッドを開始した後、メインスレッドはpthread_join()関数でハングし、何もできません。計算中に空の計算を避けるためにクライアントが停止した場合、クライアントのステータスを確認する別のスレッドを実行します。
//
typedef struct {
int sock; //
pthread_t *calcthreads; //
int threadnum; //
} checker_args_t;
// SIGUSR1
void thread_cancel( int signo) {
pthread_exit(PTHREAD_CANCELED);
}
//
void *client_check( void *arg) {
// checker_args_t
checker_args_t *args = (checker_args_t*) arg;
char a[10];
recv(args->sock, &a, 10, 0); // TCP,
// , recv -1
int st;
for ( int i = 0; i < args->threadnum; ++i)
st = pthread_kill(args->calcthreads[i], SIGUSR1); // SIGUSR1
return NULL;
}
* This source code was highlighted with Source Code Highlighter .
コンピューティングスレッドの強制終了には、シグナルを使用することにしました。それらの作業の結果としてメモリが割り当てられないため、心配する必要はありません。 ただし、 pthread_cancel()関数とpthread_cleanup_push()およびpthread_cleanup_pop()マクロを使用する方がより適切です。 もちろん、 thread_cancel()関数は、シグナルを受信したときに実行されます。 また、プログラムの開始時に、スレッドを開始する前に、信号処理の正しいマスクを設定する必要があることを忘れないでください。そうしないと、プログラムを終了する危険があります。
次に、ブロードキャストリクエストに応答するスレッドを作成しましょう。 メインスレッドが落ち着いてハングアップし、クライアントと追加のスレッドがその瞬間にリクエストに応答できるようにします。
void *listen_broadcast( void *arg) {
int *isbusy = arg;
// broadcast
int sockbrcast = socket(PF_INET, SOCK_DGRAM, 0);
if (sockbrcast == -1) {
perror( "Create broadcast socket failed" );
exit(EXIT_FAILURE);
}
// broadcast
int port_rcv = RCVPORT;
struct sockaddr_in addrbrcast_rcv;
bzero(&addrbrcast_rcv, sizeof (addrbrcast_rcv));
addrbrcast_rcv.sin_family = AF_INET;
addrbrcast_rcv.sin_addr.s_addr = htonl(INADDR_ANY);
addrbrcast_rcv.sin_port = htons(port_rcv);
//
if (bind(sockbrcast, ( struct sockaddr *) &addrbrcast_rcv,
sizeof (addrbrcast_rcv)) < 0) {
perror( "Bind broadcast socket failed" );
close(sockbrcast);
exit(EXIT_FAILURE);
}
int msgsize = sizeof ( char ) * 18;
char hellomesg[18];
bzero(hellomesg, msgsize);
// broadcast'
fcntl(sockbrcast, F_SETFL, O_NONBLOCK);
//
fd_set readset;
FD_ZERO(&readset);
FD_SET(sockbrcast, &readset);
//
struct timeval timeout;
timeout.tv_sec = 3;
timeout.tv_usec = 0;
struct sockaddr_in client;;
bzero(&client, sizeof (client));
socklen_t servaddrlen = sizeof ( struct sockaddr_in);
char helloanswer[18];
bzero(helloanswer, msgsize);
strcpy(helloanswer, "Hello Client" );
int sockst = 1;
while (sockst > 0) {
sockst = select (sockbrcast + 1, &readset, NULL, &readset, NULL);
if (sockst == -1) {
perror( "Broblems on broadcast socket" );
exit(EXIT_FAILURE);
}
int rdbyte = recvfrom(sockbrcast, ( void *) hellomesg, msgsize,MSG_TRUNC,
( struct sockaddr*) &client,
&servaddrlen);
if (rdbyte == msgsize && strcmp(hellomesg, "Hello Integral" ) == 0 &&
*isbusy == 0) {
if (sendto(sockbrcast, helloanswer, msgsize, 0,
( struct sockaddr*) &client, sizeof ( struct sockaddr_in)) < 0) {
perror( "Sending answer" );
close(sockbrcast);
exit(EXIT_FAILURE);
}
}
FD_ZERO(&readset);
FD_SET(sockbrcast, &readset);
}
return NULL;
}
* This source code was highlighted with Source Code Highlighter .
ここではすべてが簡単で、ソケットを作成してリクエストを待ちます。 リクエストを受け取った-私たちはそれに応えます。 そして、要求に答えるかどうかにかかわらず、1つの複雑さはisbusy変数の値によって決まります。
最後にメインになりました:
int main( int argc, char ** argv) {
// - -
if (argc > 2) {
fprintf(stderr, "Usage: %s [numofcpus]\n" , argv[0]);
exit(EXIT_FAILURE);
}
int numofthread;
if (argc == 2) {
numofthread = atoi(argv[1]);
if (numofthread < 1) {
fprintf(stderr, "Incorrect num of threads!\n" );
exit(EXIT_FAILURE);
}
fprintf(stdout, "Num of threads forced to %d\n" , numofthread);
} else {
// , -
numofthread = sysconf(_SC_NPROCESSORS_ONLN);
if (numofthread < 1) {
fprintf(stderr, "Can't detect num of processors\n"
"Continue in two threads\n" );
numofthread = 2;
}
fprintf(stdout, "Num of threads detected automatically it's %d\n\n" ,
numofthread);
}
* This source code was highlighted with Source Code Highlighter .
引数の検証は説明できないと思います...
信号のマスクを設定し、スレッドを実行してリクエストをリッスンします。
struct sigaction cancel_act;
memset(&cancel_act, 0, sizeof (cancel_act));
cancel_act.sa_handler = thread_cancel;
sigfillset(&cancel_act.sa_mask);
sigaction(SIGUSR1, &cancel_act, NULL);
// broadcast'
pthread_t broadcast_thread;
int isbusy = 1; //(int*) malloc(sizeof(int));
// broadcast
// 0 - , 1-
isbusy = 1;
if (pthread_create(&broadcast_thread, NULL, listen_broadcast, &isbusy)) {
fprintf(stderr, "Can't create broadcast listen thread" );
perror( "Detail:" );
exit(EXIT_FAILURE);
}
* This source code was highlighted with Source Code Highlighter .
次に、クライアントが接続を確立するためのソケットを作成します。
int listener;
struct sockaddr_in addr;
listener = socket(PF_INET, SOCK_STREAM, 0);
if (listener < 0) {
perror( "Can't create listen socket" );
exit(EXIT_FAILURE);
}
addr.sin_family = AF_INET;
addr.sin_port = htons(RCVPORT);
addr.sin_addr.s_addr = INADDR_ANY;
int a = 1;
// SO_REUSEADDR
if (setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &a, sizeof (a)) < 0) {
perror( "Set listener socket options" );
exit(EXIT_FAILURE);
}
//
if (bind(listener, ( struct sockaddr*) &addr, sizeof (addr)) < 0) {
perror( "Can't bind listen socket" );
exit(EXIT_FAILURE);
}
//
if (listen(listener, 1) < 0) {
perror( "Eror listen socket" );
exit(EXIT_FAILURE);
}
* This source code was highlighted with Source Code Highlighter .
計算サイクルを開始します。
//
int needexit = 0;
while (needexit == 0) {
fprintf(stdout, "\nWait new connection...\n\n" );
int client;
isbusy = 0; //
struct sockaddr_in addrclient;
socklen_t addrclientsize = sizeof (addrclient);
client = accept(listener, ( struct sockaddr*)&addrclient,
&addrclientsize);
if (client < 0) {
perror( "Client accepting" );
}
* This source code was highlighted with Source Code Highlighter .
別のスレッドがブロードキャストリクエストを処理するため、静かに受け入れます。
クライアントが接続した後、クライアントからのデータを確認し、計算を開始します。
isbusy = 1; //
task_data_t data;
int read_bytes = recv(client, &data, sizeof (data), 0);
if (read_bytes != sizeof (data) || data.limits < 1 || data.numoftry < 1) {
fprintf(stderr, "Invalid data from %s on port %d, reset peer\n" ,
inet_ntoa(addrclient.sin_addr), ntohs(addrclient.sin_port));
close(client);
isbusy = 0;
} else {
fprintf(stdout, "New task from %s on port %d\nlimits: %d\n"
"numoftrys: %d\n" , inet_ntoa(addrclient.sin_addr),
ntohs(addrclient.sin_port), data.limits, data.numoftry);
thread_args_t *tinfo;
pthread_t *calc_threads =
(pthread_t*) malloc( sizeof (pthread_t) * numofthread);
int threads_trys = data.numoftry % numofthread;
long double *results =
( long double *) malloc( sizeof ( long double ) * numofthread);
tinfo = (thread_args_t*) malloc( sizeof (thread_args_t) *
numofthread);
//
int numofthreadtry = data.numoftry / numofthread + 1;
for ( int i = 0; i < numofthread; ++i) {
tinfo[i].limits = data.limits;
tinfo[i].numoftry = numofthreadtry;
tinfo[i].results = &results[i];
if (pthread_create(&calc_threads[i], NULL, calculate, &tinfo[i])
!= 0) {
fprintf(stderr, "Can't create thread by num %d" , i);
perror( "Detail:" );
exit(EXIT_FAILURE);
}
}
//
checker_args_t checker_arg;
checker_arg.calcthreads = calc_threads;
checker_arg.threadnum = numofthread;
checker_arg.sock = client;
pthread_t checker_thread;
if (pthread_create(&checker_thread, NULL, client_check,
&checker_arg) != 0) {
fprintf(stderr, "Can't create checker thread" );
perror( "Detail:" );
exit(EXIT_FAILURE);
}
int iscanceled = 0; // ?
int *exitstat;
for ( int i = 0; i < numofthread; ++i) {
pthread_join(calc_threads[i], ( void *) &exitstat);
if (exitstat == PTHREAD_CANCELED)
iscanceled = 1; //
}
if (iscanceled != 1) {
long double *res = ( long double *) malloc( sizeof ( long double ));
bzero(res, sizeof ( long double ));
*res = 0.0;
for ( int i = 0; i < numofthread; ++i)
*res += results[i];
pthread_kill(checker_thread, SIGUSR1);
if (send(client, res, sizeof ( long double ), 0) < 0) {
perror( "Sending error" );
}
close(client);
free(res);
//free(checker_arg);
free(results);
free(calc_threads);
free(tinfo);
isbusy = 0;
fprintf(stdout, "Calculate and send finish!\n" );
} else {
fprintf(stderr, "Client die!\n" );
close(client);
//free(checker_arg);
free(results);
free(calc_threads);
free(tinfo);
}
}
}
return (EXIT_SUCCESS);
}
* This source code was highlighted with Source Code Highlighter .
残りのコードは簡単です:
- スレッドの計算を開始します
- スレッドチェック状態を実行する
- すべてが解決するまで待ちます。
- クライアントに対応します
- 計算サイクルを新たに開始する
クライアントとのソケットが閉じると終了するため、チェックスレッドを完了する必要がないことに注意してください。
すべてのサーバーの準備ができました。
お客様
クライアントは次のように配置されます。
- サーバーのリストを作成します
- 各サーバーで作業するために、スレッドを作成します
- 結果を待って終了します
各サーバーで作業するには、個別のスレッドを作成すると便利です。これにより、非同期で作業できるようになります。
サーバーと同様に、データ交換の構造を宣言します。 サーバーで動作するスレッドの引数構造と同様に。
//
typedef struct {
int limits;
int numoftry;
} task_data_t;
//
typedef struct {
int limits; //
int numoftry; //
struct sockaddr_in *server; //
long double *results; //
} thread_args_t;
* This source code was highlighted with Source Code Highlighter .
クライアントでは、サーバーがパケットの送信元に応答するため、リッスン用のポートを設定する必要はありません。OSから割り当てられたものを使用してみましょう。 したがって、サーバーで機能する関数(スレッド):
//
void *send_thread( void *arg) {
thread_args_t *task_data = (thread_args_t*) arg;
int servsock = socket(PF_INET, SOCK_STREAM, 0);
if (servsock < 0) {
perror( "Create new socket to server" );
exit(EXIT_FAILURE);
}
struct sockaddr_in listenaddr;
listenaddr.sin_family = AF_INET;
listenaddr.sin_addr.s_addr = INADDR_ANY;
listenaddr.sin_port = 0;
if (bind(servsock, ( struct sockaddr*) &listenaddr, sizeof (listenaddr)) < 0) {
perror( "Can't create listen socket" );
exit(EXIT_FAILURE);
}
socklen_t servaddrlen = sizeof ( struct sockaddr_in);
if (connect(servsock, ( struct sockaddr*)task_data->server,
servaddrlen) < 0) {
perror( "Connect to server failed!" );
exit(EXIT_FAILURE);
}
task_data_t senddata;
senddata.limits = task_data->limits;
senddata.numoftry = task_data->numoftry;
if (send(servsock, &senddata, sizeof (senddata), 0) < 0) {
perror( "Sending data to server failed" );
exit(EXIT_FAILURE);
}
int recv_byte = recv(servsock, task_data->results, sizeof ( long double ), 0);
if (recv_byte == 0) {
fprintf(stderr, "Server %s on port %d die!\nCancel calculate, on all" ,
inet_ntoa(task_data->server->sin_addr),
ntohs(task_data->server->sin_port));
exit(EXIT_FAILURE);
}
fprintf(stdout, "Server %s on port %d finish!\n" ,
inet_ntoa(task_data->server->sin_addr),
ntohs(task_data->server->sin_port));
return NULL;
}
* This source code was highlighted with Source Code Highlighter .
mainはメインサーバーに非常によく似ていますが、ある程度詳細にコメント化されていますが、あまり説明しません。
int main( int argc, char ** argv) {
if (argc < 3) {
fprintf(stderr, "Usage: %s limits numoftry [maxserv]\n" , argv[0]);
exit(EXIT_FAILURE);
}
int numoftry = atoi(argv[2]);
if (numoftry == 0) {
fprintf(stderr, "Num of try is invalid\n" );
exit(EXIT_FAILURE);
}
int maxservu = 1000000;
if (argc == 4) {
maxservu = atoi(argv[3]);
if (maxservu < 1) {
fprintf(stderr, "Error number of max servers\n" );
exit(EXIT_FAILURE);
}
}
int limits = atoi(argv[1]);
if (limits == 0) {
fprintf(stderr, "Limits is invalid\n" );
exit(EXIT_FAILURE);
}
// broadcast
int sockbrcast = socket(PF_INET, SOCK_DGRAM, 0);
if (sockbrcast == -1) {
perror( "Create broadcast socket failed" );
exit(EXIT_FAILURE);
}
// broadcast
int port_rcv = 0;
struct sockaddr_in addrbrcast_rcv;
bzero(&addrbrcast_rcv, sizeof (addrbrcast_rcv));
addrbrcast_rcv.sin_family = AF_INET;
addrbrcast_rcv.sin_addr.s_addr = htonl(INADDR_ANY);
addrbrcast_rcv.sin_port = 0; //htons(port_rcv);
//
if (bind(sockbrcast, ( struct sockaddr *) &addrbrcast_rcv,
sizeof (addrbrcast_rcv)) < 0) {
perror( "Bind broadcast socket failed" );
close(sockbrcast);
exit(EXIT_FAILURE);
}
// broadcast
int port_snd = 38199;
struct sockaddr_in addrbrcast_snd;
bzero(&addrbrcast_snd, sizeof (addrbrcast_snd));
addrbrcast_snd.sin_family = AF_INET;
addrbrcast_snd.sin_port = htons(port_snd);
addrbrcast_snd.sin_addr.s_addr = htonl(0xffffffff);
// broadcast
int access = 1;
if (setsockopt(sockbrcast, SOL_SOCKET, SO_BROADCAST,
( const void *) &access, sizeof (access)) < 0) {
perror( "Can't accept broadcast option at socket to send" );
close(sockbrcast);
exit(EXIT_FAILURE);
}
int msgsize = sizeof ( char ) * 18;
void *hellomesg = malloc(msgsize);
bzero(hellomesg, msgsize);
strcpy(hellomesg, "Hello Integral" );
// broadcast
if (sendto(sockbrcast, hellomesg, msgsize, 0,
( struct sockaddr*) &addrbrcast_snd, sizeof (addrbrcast_snd)) < 0) {
perror( "Sending broadcast" );
close(sockbrcast);
exit(EXIT_FAILURE);
}
// broadcast'
fcntl(sockbrcast, F_SETFL, O_NONBLOCK);
//
fd_set readset;
FD_ZERO(&readset);
FD_SET(sockbrcast, &readset);
//
struct timeval timeout;
timeout.tv_sec = 3;
timeout.tv_usec = 0;
struct sockaddr_in *servers =
( struct sockaddr_in*) malloc( sizeof ( struct sockaddr_in));
bzero(servers, sizeof ( struct sockaddr_in));
int servcount = 0;
int maxserv = 1;
socklen_t servaddrlen = sizeof ( struct sockaddr_in);
// servers
while ( select (sockbrcast + 1, &readset, NULL, &readset, &timeout) > 0) {
int rdbyte = recvfrom(sockbrcast, ( void *) hellomesg, msgsize,MSG_TRUNC,
( struct sockaddr*) &servers[servcount],
&servaddrlen);
if (rdbyte == msgsize && strcmp(hellomesg, "Hello Client" ) == 0) {
servcount++;
if (servcount >= maxserv) {
servers = realloc(servers,
sizeof ( struct sockaddr_in) * (maxserv + 1));
if (servers == NULL) {
perror( "Realloc failed" );
close(sockbrcast);
exit(EXIT_FAILURE);
}
bzero(&servers[servcount], servaddrlen);
maxserv++;
}
FD_ZERO(&readset);
FD_SET(sockbrcast, &readset);
}
}
int i;
if (servcount < 1) {
fprintf(stderr, "No servers found!\n" );
exit(EXIT_FAILURE);
}
if (argc > 3 && maxservu <= servcount)
servcount = maxservu;
for (i = 0; i < servcount; ++i) {
printf( "Server answer from %s on port %d\n" ,
inet_ntoa(servers[i].sin_addr), ntohs(servers[i].sin_port));
}
printf( "\n" );
free(hellomesg);
long double *results =
( long double *) malloc( sizeof ( long double ) * servcount);
//
pthread_t *tid = (pthread_t*) malloc( sizeof (pthread_t) * servcount);
for (i = 0; i < servcount; ++i) {
thread_args_t *args = (thread_args_t*) malloc ( sizeof (thread_args_t));
args->limits = limits;
args->numoftry = numoftry / servcount + 1;
args->results = &results[i];
args->server = &servers[i];
if (pthread_create(&tid[i], NULL, send_thread, args) != 0) {
perror( "Create send thread failed" );
exit(EXIT_FAILURE);
}
}
long double res = 0;
//
for (i = 0; i < servcount; ++i)
pthread_join(tid[i], NULL);
//
for (i = 0; i < servcount; ++i)
res += results[i];
res /= numoftry;
res *= limits;
free(servers);
printf( "\nResult: %Lf\n" , res);
return (EXIT_SUCCESS);
}
* This source code was highlighted with Source Code Highlighter .
性能試験
スレッド数に応じたパフォーマンス
同じコンピューターで、同じ入力データを使用して、異なるスレッド数でプログラムを実行しました。
パラメーター2 1000000000でクライアントを開始します。 パラメーター1、2、4、8のサーバー。
結果:それぞれ0m37.063s、0m20.576s、0m20.329s、0m21.029s。 時間が示した結果から、サーバーを待機する4秒を差し引く必要があることに注意してください。
コア(TM)2 Duo CPU T5470上のプロセッサー 。 予想どおり、プロセッサには2つのコアがあり、したがって3つ以上のスレッドを起動しても意味がありません。したがって、スレッド数を2倍にすると、
二重加速。
また、 Asus 1215pでテストしましたが、結果は上記と同じスレッド数に依存します
車の数に応じたパフォーマンス
ここで、1、2、4、8、16台のコンピューターでサーバーを起動しました。スレッドの数はコアの数、つまり2つのスレッドに等しくなります。
結果は次のとおりです:0m10.268s、0m5.122s、0m2.487s、0m1.265、0m0.766s。
再び、予想される結果が得られます。新しいコンピューターごとに、計算が2倍に加速されます。
合計
その結果、積分を同時に計算し、操作中に発生する可能性があるあらゆる種類のトラブルに耐えるプログラムを取得します。 残念ながら、記事のサイズが大きいため、すべてを詳細に検討することはできませんでした。 しかし、私の例が、私に発生したいくつかのエラーを回避するために新規参入者に役立つことを願っています。 makeファイルを含むプログラムのソースは、 ここにあります 。
ご清聴ありがとうございました!