11

实在很简陋,许多控制功能都没有实现:)
不过也能表现大致的框架吧。

/*
 * A simple process pool
 */
#include <iostream>
#include <queue>
#include <vector>
#include <algorithm>
 
#include <sys/wait.h>
#include <assert.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>
 
#include <sys/mman.h>
#include <sys/select.h>
 
using namespace std;
 
int iProcessCount = 0;
int *pDispatchCount;
 
void InterruptHanler(int iSigno)
{
	//ToDo: kill all children
	cerr << "Dispatch Count: " << endl;
	for( unsigned int i = 0; i < iProcessCount; i++ )
	{
		cerr << i << "\t" << pDispatchCount[i] << endl;
	}
	exit(0);
}
 
//ToDo: replace two pipes with socketpair
int main(int argc, char **argv)
{
	if( argc == 2 )
	{
		iProcessCount = atoi(argv[1]);
	}
 
	if( iProcessCount <= 0 )
		iProcessCount = 10;
 
	int (*iPipeFD)[2] = new(int[iProcessCount][2]);
	int (*iNotifyPipeFD)[2] = new(int[iProcessCount][2]);;
	pDispatchCount = (int *)mmap(0, sizeof(int) * iProcessCount, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);
 
	queue<int> queIdle;
	vector<int> vecBusy;
 
	for( unsigned int i = 0; i < iProcessCount; i++ )
	{
		if( pipe(iPipeFD[i]) == -1 )
		{
			cerr << "pipe error" << endl;
			return 0;	
		}
 
		if( pipe(iNotifyPipeFD[i]) == -1 )
		{
			cerr << "pipe error" << endl;
			return 0;	
		}
	}
 
	for( unsigned int i = 0; i < iProcessCount; i++ )
	{
		pid_t pid = fork();
		if( pid < 0 )
		{
			cerr << "fork error" << endl;
			return 0;
		}
 
		if( pid == 0 )	//child
		{
			int iCount = 0;
			while(1)
			{
				char sBuf[16] = {0};
				read( iPipeFD[i][0], sBuf, 15 );
				cout << i << ": " << getpid() << ", " << ++iCount << "\t" << sBuf << endl;
				pDispatchCount[i]++;
 
				//Do your work here
				int iMax = atoi(sBuf);
				usleep(iMax % 10000);
 
				write( iNotifyPipeFD[i][1], "1", 1);
			}
		}
 
		queIdle.push(i);
	}
 
	signal( SIGINT, InterruptHanler );
 
	int iCounter = 0;
	while( 1 ) 
	{
		if( queIdle.size() )
		{
			//Dispatch work
			char sBuf[16] = {0};
			int iRand = rand();
			snprintf(sBuf, 16, "%d", iRand);
 
			int iIndex = queIdle.front();
			queIdle.pop();
			write( iPipeFD[iIndex][1], sBuf, strlen(sBuf) );
			cout << "dispatch " << sBuf << " to " << iIndex << endl;
 
			vecBusy.push_back(iIndex);
		}
 
		cout << ++iCounter << "\t: queIdle.size = " << queIdle.size() << " vecBusy.size = " << vecBusy.size() << endl;
		//if( vecBusy.size() )
		{
			fd_set set;
			FD_ZERO(&set);
 
			int iMaxFD = 0;	
			for( unsigned int i = 0; i < vecBusy.size(); i++ )
			{
				int iIndex = vecBusy[i];
				FD_SET( iNotifyPipeFD[iIndex][0], &set);
				iMaxFD = max( iMaxFD, iNotifyPipeFD[iIndex][0]);
			}
 
			struct timeval timeout;
			timeout.tv_sec = 1;
			timeout.tv_usec = 0;
			select(iMaxFD + 1, &set, NULL, NULL, &timeout);
 
			for( vector<int>::iterator it = vecBusy.begin(); it != vecBusy.end(); )
			{
				int iIndex = *it;
				int iReadFD = iNotifyPipeFD[iIndex][0];
				if( FD_ISSET( iReadFD, &set ) )
				{
					queIdle.push(iIndex);
					it = vecBusy.erase(it);
				}
				else
				{
					it++;
				}
			}
		}
	}
 
	return 0;
}
preload preload preload

无觅相关文章插件,快速提升流量