十 11
实在很简陋,许多控制功能都没有实现:)
不过也能表现大致的框架吧。
/* * A simple process pool */ #include <iostream> #include <queue> #include <vector> #include <algorithm> #include <sys/wait.h> #include <assert.h> #include <stdlib.h> #include <unistd.h> #include <signal.h> #include <sys/mman.h> #include <sys/select.h> using namespace std; int iProcessCount = 0; int *pDispatchCount; void InterruptHanler(int iSigno) { //ToDo: kill all children cerr << "Dispatch Count: " << endl; for( unsigned int i = 0; i < iProcessCount; i++ ) { cerr << i << "\t" << pDispatchCount[i] << endl; } exit(0); } //ToDo: replace two pipes with socketpair int main(int argc, char **argv) { if( argc == 2 ) { iProcessCount = atoi(argv[1]); } if( iProcessCount <= 0 ) iProcessCount = 10; int (*iPipeFD)[2] = new(int[iProcessCount][2]); int (*iNotifyPipeFD)[2] = new(int[iProcessCount][2]);; pDispatchCount = (int *)mmap(0, sizeof(int) * iProcessCount, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0); queue<int> queIdle; vector<int> vecBusy; for( unsigned int i = 0; i < iProcessCount; i++ ) { if( pipe(iPipeFD[i]) == -1 ) { cerr << "pipe error" << endl; return 0; } if( pipe(iNotifyPipeFD[i]) == -1 ) { cerr << "pipe error" << endl; return 0; } } for( unsigned int i = 0; i < iProcessCount; i++ ) { pid_t pid = fork(); if( pid < 0 ) { cerr << "fork error" << endl; return 0; } if( pid == 0 ) //child { int iCount = 0; while(1) { char sBuf[16] = {0}; read( iPipeFD[i][0], sBuf, 15 ); cout << i << ": " << getpid() << ", " << ++iCount << "\t" << sBuf << endl; pDispatchCount[i]++; //Do your work here int iMax = atoi(sBuf); usleep(iMax % 10000); write( iNotifyPipeFD[i][1], "1", 1); } } queIdle.push(i); } signal( SIGINT, InterruptHanler ); int iCounter = 0; while( 1 ) { if( queIdle.size() ) { //Dispatch work char sBuf[16] = {0}; int iRand = rand(); snprintf(sBuf, 16, "%d", iRand); int iIndex = queIdle.front(); queIdle.pop(); write( iPipeFD[iIndex][1], sBuf, strlen(sBuf) ); cout << "dispatch " << sBuf << " to " << iIndex << endl; vecBusy.push_back(iIndex); } cout << ++iCounter << "\t: queIdle.size = " << queIdle.size() << " vecBusy.size = " << vecBusy.size() << endl; //if( vecBusy.size() ) { fd_set set; FD_ZERO(&set); int iMaxFD = 0; for( unsigned int i = 0; i < vecBusy.size(); i++ ) { int iIndex = vecBusy[i]; FD_SET( iNotifyPipeFD[iIndex][0], &set); iMaxFD = max( iMaxFD, iNotifyPipeFD[iIndex][0]); } struct timeval timeout; timeout.tv_sec = 1; timeout.tv_usec = 0; select(iMaxFD + 1, &set, NULL, NULL, &timeout); for( vector<int>::iterator it = vecBusy.begin(); it != vecBusy.end(); ) { int iIndex = *it; int iReadFD = iNotifyPipeFD[iIndex][0]; if( FD_ISSET( iReadFD, &set ) ) { queIdle.push(iIndex); it = vecBusy.erase(it); } else { it++; } } } } return 0; }