use main node for computation
This commit is contained in:
		
							
								
								
									
										128
									
								
								parallel.c
									
									
									
									
									
								
							
							
						
						
									
										128
									
								
								parallel.c
									
									
									
									
									
								
							@@ -10,14 +10,14 @@
 | 
			
		||||
#include <malloc.h>
 | 
			
		||||
#include <stdlib.h>
 | 
			
		||||
 | 
			
		||||
#define DEBUG(msg, ...)
 | 
			
		||||
#define DEBUG INFO
 | 
			
		||||
#define INFO(msg, ...) fprintf(stderr, "[%003d%10.3f] " msg, mpi_rank(0), runtime(), ##__VA_ARGS__)
 | 
			
		||||
//#define DEBUG(msg, ...) fprintf(stderr, "[   %10.3f] " msg, runtime(), ##__VA_ARGS__)
 | 
			
		||||
//#define DEBUG_MPI(msg, node, ...) fprintf(stderr, "[%003d%10.3f] " msg, node, runtime(), ##__VA_ARGS__)
 | 
			
		||||
#define DONE(x) *((int*)(x))
 | 
			
		||||
 | 
			
		||||
enum message_tag {
 | 
			
		||||
	PARALLEL_ORDER,
 | 
			
		||||
	PARALLEL_ORDER = 0,
 | 
			
		||||
	PARALLEL_RESULT,
 | 
			
		||||
	PARALLEL_SHUTDOWN,
 | 
			
		||||
	PARALLEL_GLOBAL_DATA
 | 
			
		||||
@@ -114,42 +114,34 @@ void parallel_set_datasize_and_callbacks(parallel_context *ctx, parallel_callbac
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int parallel_work(parallel_context *ctx)
 | 
			
		||||
int parallel_job(parallel_context *ctx, const void *global_data, void *node_data, int block)
 | 
			
		||||
{
 | 
			
		||||
	// do nothing in non-mpi mode
 | 
			
		||||
	if(ctx->mpi_mode == 0)
 | 
			
		||||
		return 0;
 | 
			
		||||
 | 
			
		||||
	MPI_Status status;
 | 
			
		||||
	void *global_data = malloc(ctx->global_data_size);
 | 
			
		||||
	void *node_data = malloc(ctx->node_data_size);
 | 
			
		||||
	double jobtime;
 | 
			
		||||
	void *input_and_job_nr = malloc(ctx->input_size + sizeof(int));
 | 
			
		||||
	void *output_and_job_nr = malloc(ctx->output_size + sizeof(int));
 | 
			
		||||
	void *input = input_and_job_nr + sizeof(int);
 | 
			
		||||
	int *job_nr = (int *)input_and_job_nr;
 | 
			
		||||
	void *output_and_job_nr = malloc(ctx->output_size + sizeof(int));
 | 
			
		||||
	void *output = output_and_job_nr + sizeof(int);
 | 
			
		||||
	int *output_job_nr = (int *)output_and_job_nr;
 | 
			
		||||
	double jobtime;
 | 
			
		||||
	int result = 0;
 | 
			
		||||
	int message_present;
 | 
			
		||||
 | 
			
		||||
	// wait for global data
 | 
			
		||||
	MPI_Bcast(global_data, ctx->global_data_size, MPI_BYTE, 0, MPI_COMM_WORLD);
 | 
			
		||||
 | 
			
		||||
	DEBUG("Global data received\n");
 | 
			
		||||
 | 
			
		||||
	// initialize node_data (and do once-per-node computation)
 | 
			
		||||
	ctx->init(global_data, node_data);
 | 
			
		||||
 | 
			
		||||
	DEBUG("Initialization completed\n");
 | 
			
		||||
 | 
			
		||||
	while(1) {
 | 
			
		||||
	if(block) {
 | 
			
		||||
		MPI_Probe(0, MPI_ANY_TAG, MPI_COMM_WORLD,
 | 
			
		||||
		          &status);
 | 
			
		||||
		message_present = 1;
 | 
			
		||||
	} else {
 | 
			
		||||
		MPI_Iprobe(0, MPI_ANY_TAG, MPI_COMM_WORLD,
 | 
			
		||||
		          &message_present, &status);
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
//		DEBUG("Message received: source = %d, tag = %d\n", status.MPI_SOURCE, status.MPI_TAG);
 | 
			
		||||
 | 
			
		||||
	if(message_present) {
 | 
			
		||||
		if(status.MPI_TAG == PARALLEL_SHUTDOWN) {
 | 
			
		||||
			DEBUG("Shutting down\n");
 | 
			
		||||
			break;
 | 
			
		||||
			result = 1;
 | 
			
		||||
		} else if(status.MPI_TAG == PARALLEL_ORDER) {
 | 
			
		||||
			MPI_Recv(input_and_job_nr,
 | 
			
		||||
			         1, ctx->order_datatype,
 | 
			
		||||
@@ -172,14 +164,44 @@ int parallel_work(parallel_context *ctx)
 | 
			
		||||
			         1, ctx->result_datatype,
 | 
			
		||||
			         0, PARALLEL_RESULT, MPI_COMM_WORLD);
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		result = 2;
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	free(input_and_job_nr);
 | 
			
		||||
	free(output_and_job_nr);
 | 
			
		||||
 | 
			
		||||
	return result;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int parallel_work(parallel_context *ctx)
 | 
			
		||||
{
 | 
			
		||||
	// do nothing in non-mpi mode
 | 
			
		||||
	if(ctx->mpi_mode == 0)
 | 
			
		||||
		return 0;
 | 
			
		||||
 | 
			
		||||
	void *global_data = malloc(ctx->global_data_size);
 | 
			
		||||
	void *node_data = malloc(ctx->node_data_size);
 | 
			
		||||
 | 
			
		||||
	// wait for global data
 | 
			
		||||
	MPI_Bcast(global_data, ctx->global_data_size, MPI_BYTE, 0, MPI_COMM_WORLD);
 | 
			
		||||
 | 
			
		||||
	DEBUG("Global data received\n");
 | 
			
		||||
 | 
			
		||||
	// initialize node_data (and do once-per-node computation)
 | 
			
		||||
	ctx->init(global_data, node_data);
 | 
			
		||||
 | 
			
		||||
	DEBUG("Initialization completed\n");
 | 
			
		||||
 | 
			
		||||
	int shutdown = 0;
 | 
			
		||||
	while(shutdown != 1) {
 | 
			
		||||
		shutdown = parallel_job(ctx, global_data, node_data, 1);
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ctx->destroy(global_data, node_data);
 | 
			
		||||
 | 
			
		||||
	free(global_data);
 | 
			
		||||
	free(node_data);
 | 
			
		||||
	free(input_and_job_nr);
 | 
			
		||||
	free(output_and_job_nr);
 | 
			
		||||
 | 
			
		||||
	return 0;
 | 
			
		||||
}
 | 
			
		||||
@@ -188,9 +210,8 @@ int parallel_run(parallel_context *ctx, const void *global_data, const void *inp
 | 
			
		||||
{
 | 
			
		||||
	// in non-mpi-mode, just run init1, init2, forall(jobs) job
 | 
			
		||||
	if(ctx->mpi_mode == 0) {
 | 
			
		||||
		int result;
 | 
			
		||||
		void *node_data = malloc(ctx->node_data_size);
 | 
			
		||||
		result = ctx->init(global_data, node_data);
 | 
			
		||||
		int result = ctx->init(global_data, node_data);
 | 
			
		||||
		if(result != 0)
 | 
			
		||||
			goto cleanup_standalone;
 | 
			
		||||
 | 
			
		||||
@@ -271,8 +292,15 @@ int parallel_run(parallel_context *ctx, const void *global_data, const void *inp
 | 
			
		||||
 | 
			
		||||
		DEBUG("Global data sent\n");
 | 
			
		||||
 | 
			
		||||
		// we want to be able to run jobs ourselves, so initialize node_data
 | 
			
		||||
		void *node_data = malloc(ctx->node_data_size);
 | 
			
		||||
		ctx->init(global_data, node_data);
 | 
			
		||||
 | 
			
		||||
		void *input_message_buffer = malloc(ctx->input_size + sizeof(int));
 | 
			
		||||
		void *output_message_buffer = malloc(ctx->output_size + sizeof(int));
 | 
			
		||||
		int *active_jobs = malloc(sizeof(int)*ctx->size);
 | 
			
		||||
		memset(active_jobs, 0, ctx->size*sizeof(int));
 | 
			
		||||
		int active_worker_nodes = ctx->size - 1;  // we don't count ourselves, since we can't shut ourselves down
 | 
			
		||||
 | 
			
		||||
		// find next unfinished job
 | 
			
		||||
		int current = 0;
 | 
			
		||||
@@ -280,7 +308,7 @@ int parallel_run(parallel_context *ctx, const void *global_data, const void *inp
 | 
			
		||||
			current++;
 | 
			
		||||
 | 
			
		||||
		// assign initial jobs, 2 for each worker thread
 | 
			
		||||
		for(int i = 0; i < 2*(ctx->size-1); i++) {
 | 
			
		||||
		for(int i = 0; i < 2*ctx->size; i++) {
 | 
			
		||||
			if(current >= njobs) // all jobs are assigned
 | 
			
		||||
				break;
 | 
			
		||||
 | 
			
		||||
@@ -289,42 +317,54 @@ int parallel_run(parallel_context *ctx, const void *global_data, const void *inp
 | 
			
		||||
			*((int*)input_message_buffer) = current;
 | 
			
		||||
			memcpy(input_message_buffer + sizeof(int), input_array + current*ctx->input_size, ctx->input_size);
 | 
			
		||||
			MPI_Send(input_message_buffer, 1, ctx->order_datatype,
 | 
			
		||||
			         i%(ctx->size-1)+1, PARALLEL_ORDER, MPI_COMM_WORLD);
 | 
			
		||||
			         i%ctx->size, PARALLEL_ORDER, MPI_COMM_WORLD);
 | 
			
		||||
 | 
			
		||||
			DEBUG("Job %d sent to node %d\n", current, i%(ctx->size-1)+1);
 | 
			
		||||
			DEBUG("Job %d sent to node %d\n", current, i%ctx->size);
 | 
			
		||||
			active_jobs[i%ctx->size]++;
 | 
			
		||||
			current++;
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		MPI_Status status;
 | 
			
		||||
		int active_worker_nodes = ctx->size - 1;
 | 
			
		||||
		while(1) {
 | 
			
		||||
			MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
 | 
			
		||||
			if(status.MPI_TAG == PARALLEL_RESULT) {
 | 
			
		||||
		int message_present;
 | 
			
		||||
		while(active_jobs[0] != 0 || active_worker_nodes != 0) {
 | 
			
		||||
			MPI_Iprobe(MPI_ANY_SOURCE, PARALLEL_RESULT, MPI_COMM_WORLD, &message_present, &status);
 | 
			
		||||
			DEBUG("Message present, tag = %d, source = %d\n", status.MPI_TAG, status.MPI_SOURCE);
 | 
			
		||||
			if(!message_present) {
 | 
			
		||||
				// if there are no more messages to process, we can run a job ourselves before returning to managing
 | 
			
		||||
				DEBUG("Start running job myself\n");
 | 
			
		||||
				int result = parallel_job(ctx, global_data, node_data, 0);
 | 
			
		||||
				DEBUG("Finished running job myself, result = %d\n");
 | 
			
		||||
			} else if(status.MPI_TAG == PARALLEL_RESULT) {
 | 
			
		||||
				MPI_Recv(output_message_buffer, 1, ctx->result_datatype,
 | 
			
		||||
				         MPI_ANY_SOURCE, PARALLEL_RESULT, MPI_COMM_WORLD, &status);
 | 
			
		||||
 | 
			
		||||
				DEBUG("Got message tag %d from node %d\n", status.MPI_TAG, status.MPI_SOURCE);
 | 
			
		||||
 | 
			
		||||
				int node = status.MPI_SOURCE;
 | 
			
		||||
				int id = *((int*)output_message_buffer);
 | 
			
		||||
				memcpy(alljobs + id*itemsize + sizeof(int), output_message_buffer + sizeof(int), ctx->output_size);
 | 
			
		||||
				DONE(alljobs + id*itemsize) = 1;
 | 
			
		||||
				completed++;
 | 
			
		||||
				active_jobs[node]--;
 | 
			
		||||
 | 
			
		||||
				// todo: deal with unresponsive nodes
 | 
			
		||||
				// strategy: when no jobs left, go through unfinished list again, incrementing oversubscribe counter
 | 
			
		||||
				// if oversubscribe counter is at limit, shut node down instead
 | 
			
		||||
				//
 | 
			
		||||
 | 
			
		||||
				if(current >= njobs) { // all jobs are assigned, shut down node
 | 
			
		||||
					INFO("job %d completed by node %d, shut down\n", id, status.MPI_SOURCE);
 | 
			
		||||
					MPI_Send(NULL, 0, MPI_BYTE, status.MPI_SOURCE, PARALLEL_SHUTDOWN, MPI_COMM_WORLD);
 | 
			
		||||
					active_worker_nodes--;
 | 
			
		||||
					if(!active_worker_nodes)
 | 
			
		||||
						break;
 | 
			
		||||
				if(current >= njobs) { // all jobs are assigned, try to shut down node
 | 
			
		||||
					// don't try to shut down ourselves, and only if it has no other jobs to do
 | 
			
		||||
					if(node != 0 && active_jobs[node] == 0) {
 | 
			
		||||
						MPI_Send(NULL, 0, MPI_BYTE, node, PARALLEL_SHUTDOWN, MPI_COMM_WORLD);
 | 
			
		||||
						active_worker_nodes--;
 | 
			
		||||
						INFO("job %d completed by node %d, shut down, %d workers remaining\n", id, node, active_worker_nodes);
 | 
			
		||||
					}
 | 
			
		||||
				} else {
 | 
			
		||||
					INFO("job %d completed by node %d, continues with %d\n", id, status.MPI_SOURCE, current);
 | 
			
		||||
					INFO("job %d completed by node %d, continues with %d\n", id, node, current);
 | 
			
		||||
					*((int*)input_message_buffer) = current;
 | 
			
		||||
					memcpy(input_message_buffer + sizeof(int), input_array + current*ctx->input_size, ctx->input_size);
 | 
			
		||||
					MPI_Send(input_message_buffer, 1, ctx->order_datatype,
 | 
			
		||||
					         status.MPI_SOURCE, PARALLEL_ORDER, MPI_COMM_WORLD);
 | 
			
		||||
					         node, PARALLEL_ORDER, MPI_COMM_WORLD);
 | 
			
		||||
					active_jobs[node]++;
 | 
			
		||||
					current++;
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
@@ -336,6 +376,8 @@ int parallel_run(parallel_context *ctx, const void *global_data, const void *inp
 | 
			
		||||
 | 
			
		||||
		free(input_message_buffer);
 | 
			
		||||
		free(output_message_buffer);
 | 
			
		||||
		free(node_data);
 | 
			
		||||
		free(active_jobs);
 | 
			
		||||
 | 
			
		||||
	cleanup_mpi:
 | 
			
		||||
		munmap(alljobs, njobs*itemsize);
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user