new simpler approach to parallalization

This commit is contained in:
Florian Stecker 2022-06-15 15:17:54 +02:00
parent 429f0890d6
commit ac80bc9f3f
13 changed files with 76 additions and 824 deletions

View File

@ -1,135 +0,0 @@
euler.ma.utexas.edu
fac1.ma.utexas.edu
fac4.ma.utexas.edu
fac8.ma.utexas.edu
fac9.ma.utexas.edu
frog.ma.utexas.edu
gummo.ma.utexas.edu
iguana.ma.utexas.edu
lab10.ma.utexas.edu
lab11.ma.utexas.edu
lab12.ma.utexas.edu
lab13.ma.utexas.edu
lab14.ma.utexas.edu
lab15.ma.utexas.edu
lab16.ma.utexas.edu
lab17.ma.utexas.edu
lab18.ma.utexas.edu
lab19.ma.utexas.edu
lab1.ma.utexas.edu
lab20.ma.utexas.edu
lab21.ma.utexas.edu
lab22.ma.utexas.edu
lab23.ma.utexas.edu
lab24.ma.utexas.edu
lab25.ma.utexas.edu
lab26.ma.utexas.edu
lab27.ma.utexas.edu
lab28.ma.utexas.edu
lab29.ma.utexas.edu
lab2.ma.utexas.edu
lab30.ma.utexas.edu
lab31.ma.utexas.edu
lab32.ma.utexas.edu
lab33.ma.utexas.edu
lab34.ma.utexas.edu
lab35.ma.utexas.edu
lab36.ma.utexas.edu
lab37.ma.utexas.edu
lab38.ma.utexas.edu
lab39.ma.utexas.edu
lab3.ma.utexas.edu
lab40.ma.utexas.edu
lab41.ma.utexas.edu
lab42.ma.utexas.edu
lab43.ma.utexas.edu
lab44.ma.utexas.edu
lab45.ma.utexas.edu
lab46.ma.utexas.edu
lab47.ma.utexas.edu
lab48.ma.utexas.edu
lab49.ma.utexas.edu
lab4.ma.utexas.edu
lab50.ma.utexas.edu
lab51.ma.utexas.edu
lab52.ma.utexas.edu
lab53.ma.utexas.edu
lab54.ma.utexas.edu
lab55.ma.utexas.edu
lab56.ma.utexas.edu
lab57.ma.utexas.edu
lab58.ma.utexas.edu
lab59.ma.utexas.edu
lab5.ma.utexas.edu
lab60.ma.utexas.edu
lab61.ma.utexas.edu
lab62.ma.utexas.edu
lab63.ma.utexas.edu
lab64.ma.utexas.edu
lab65.ma.utexas.edu
lab66.ma.utexas.edu
lab67.ma.utexas.edu
lab68.ma.utexas.edu
lab69.ma.utexas.edu
lab6.ma.utexas.edu
lab70.ma.utexas.edu
lab7.ma.utexas.edu
lab8.ma.utexas.edu
lab9.ma.utexas.edu
linux100.ma.utexas.edu
linux104.ma.utexas.edu
linux110.ma.utexas.edu
linux115.ma.utexas.edu
linux119.ma.utexas.edu
linux122.ma.utexas.edu
linux149.ma.utexas.edu
linux14.ma.utexas.edu
linux15.ma.utexas.edu
linux164.ma.utexas.edu
linux169.ma.utexas.edu
linux16.ma.utexas.edu
linux17.ma.utexas.edu
linux180.ma.utexas.edu
linux181.ma.utexas.edu
linux184.ma.utexas.edu
linux18.ma.utexas.edu
linux20.ma.utexas.edu
linux21.ma.utexas.edu
linux24.ma.utexas.edu
linux27.ma.utexas.edu
linux28.ma.utexas.edu
linux29.ma.utexas.edu
linux2.ma.utexas.edu
linux30.ma.utexas.edu
linux31.ma.utexas.edu
linux32.ma.utexas.edu
linux38.ma.utexas.edu
linux40.ma.utexas.edu
linux41.ma.utexas.edu
linux46.ma.utexas.edu
linux4.ma.utexas.edu
linux50.ma.utexas.edu
linux52.ma.utexas.edu
linux54.ma.utexas.edu
linux57.ma.utexas.edu
linux62.ma.utexas.edu
linux64.ma.utexas.edu
linux66.ma.utexas.edu
linux68.ma.utexas.edu
linux69.ma.utexas.edu
linux70.ma.utexas.edu
linux71.ma.utexas.edu
linux72.ma.utexas.edu
linux74.ma.utexas.edu
linux76.ma.utexas.edu
linux79.ma.utexas.edu
linux80.ma.utexas.edu
linux82.ma.utexas.edu
linux83.ma.utexas.edu
linux86.ma.utexas.edu
linux91.ma.utexas.edu
linux92.ma.utexas.edu
linux96.ma.utexas.edu
linux9.ma.utexas.edu

View File

@ -0,0 +1,19 @@
#!/usr/bin/python
wordlength = 16
n = 101038
res = 50
radius = 1.0
q = [1,1,1]
denom = round(res/radius)
cmd = "IDLIST=./output/idlist_{len} ./complex_anosov summary {n} {q1} {q2} {q3} {rnum}/{rden} {inum}/{iden}"
for i in range(-res,res+1):
for j in range(0,res+1):
if i == 0 and j == 0:
continue
print(cmd.format(
len=wordlength, n=n, q1=q[0], q2=q[1], q3=q[2],
rnum=i, inum=j, rden=denom, iden=denom))

View File

@ -1,5 +0,0 @@
linux50 slots=4
linux52 slots=4
linux57 slots=4
linux110 slots=4
linux115 slots=4

View File

@ -1,48 +0,0 @@
linux100 slots=4
linux104 slots=4
linux110 slots=4
linux122 slots=4
linux149 slots=4
linux14 slots=4
linux15 slots=2
linux16 slots=4
linux17 slots=2
linux180 slots=4
linux181 slots=2
linux184 slots=4
linux18 slots=4
linux20 slots=4
linux21 slots=4
linux24 slots=4
linux27 slots=4
linux29 slots=4
linux2 slots=4
linux30 slots=4
linux31 slots=4
linux32 slots=4
linux38 slots=2
linux40 slots=4
linux41 slots=4
linux46 slots=4
linux4 slots=4
linux50 slots=4
linux52 slots=4
linux54 slots=4
linux57 slots=4
linux62 slots=2
linux64 slots=4
linux68 slots=4
linux69 slots=4
linux70 slots=4
linux71 slots=4
linux72 slots=4
linux74 slots=4
linux76 slots=4
linux79 slots=4
linux80 slots=4
linux83 slots=4
linux86 slots=4
linux91 slots=4
linux92 slots=4
linux96 slots=4
linux9 slots=2

View File

@ -1,51 +0,0 @@
linux100
linux104
linux110
linux115
linux122
linux149
linux14
linux15
linux164
linux169
linux16
linux17
linux180
linux181
linux184
linux18
linux20
linux21
linux24
linux27
linux29
linux2
linux30
linux31
linux32
linux38
linux40
linux41
linux46
linux4
linux50
linux52
linux54
linux57
linux62
linux64
linux68
linux69
linux70
linux71
linux72
linux74
linux76
linux79
linux80
linux83
linux86
linux91
linux92
linux96
linux9

View File

@ -1,409 +0,0 @@
#include "parallel.h"
#include <mpi.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <malloc.h>
#include <stdlib.h>
//#define DEBUG INFO
#define DEBUG(msg, ...)
#define INFO(msg, ...) fprintf(stderr, "[%003d%10.3f] " msg, mpi_rank(0), runtime(), ##__VA_ARGS__)
//#define DEBUG(msg, ...) fprintf(stderr, "[ %10.3f] " msg, runtime(), ##__VA_ARGS__)
//#define DEBUG_MPI(msg, node, ...) fprintf(stderr, "[%003d%10.3f] " msg, node, runtime(), ##__VA_ARGS__)
#define DONE(x) *((int*)(x))
enum message_tag {
PARALLEL_ORDER = 0,
PARALLEL_RESULT,
PARALLEL_SHUTDOWN,
PARALLEL_GLOBAL_DATA
};
struct timespec starttime;
int mpi_rank(int activate_mpi)
{
static int active = 0;
if(activate_mpi)
active = 1;
if(!active)
return 0;
else {
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
return rank;
}
}
void start_timer()
{
clock_gettime(CLOCK_MONOTONIC, &starttime);
}
double runtime()
{
struct timespec curtime;
double diff;
clock_gettime(CLOCK_MONOTONIC, &curtime);
return (curtime.tv_sec - starttime.tv_sec) + (curtime.tv_nsec - starttime.tv_nsec) / 1e9;
}
parallel_context *parallel_init()
{
parallel_context *ctx = malloc(sizeof(parallel_context));
if(!getenv("OMPI_COMM_WORLD_SIZE")) {
ctx->mpi_mode = 0;
DEBUG("Running standalone.\n");
return ctx;
}
ctx->mpi_mode = 1;
int result = MPI_Init(NULL, NULL);
MPI_Comm_size(MPI_COMM_WORLD, &ctx->size);
MPI_Comm_rank(MPI_COMM_WORLD, &ctx->rank);
MPI_Get_processor_name(ctx->processor_name, &ctx->processor_name_len);
mpi_rank(1); // display the rank in debug output from now on
if(ctx->rank == 0)
DEBUG("Running in mpi mode, %d nodes.\n", ctx->size);
return ctx;
}
void parallel_destroy(parallel_context* ctx)
{
if(ctx->mpi_mode) {
MPI_Type_free(&ctx->order_datatype);
MPI_Type_free(&ctx->result_datatype);
MPI_Finalize();
}
free(ctx);
}
void parallel_set_datasize_and_callbacks(parallel_context *ctx, parallel_callback_init init, parallel_callback_job job, parallel_callback_destroy destroy, int global_data_size, int node_data_size, int input_size, int output_size)
{
ctx->init = init;
ctx->destroy = destroy;
ctx->job = job;
ctx->global_data_size = global_data_size;
ctx->node_data_size = node_data_size;
ctx->input_size = input_size;
ctx->output_size = output_size;
if(ctx->mpi_mode) {
// create a datatype for job orders, consisting of an integer (the job id) and a user-defined section
int order_blocklengths[2] = {1, input_size};
MPI_Aint order_displacements[2] = {0, sizeof(int)};
MPI_Datatype order_types[2] = {MPI_INT, MPI_BYTE};
MPI_Type_create_struct(2, order_blocklengths, order_displacements, order_types, &ctx->order_datatype);
MPI_Type_commit(&ctx->order_datatype);
int result_blocklengths[2] = {1, output_size};
MPI_Aint result_displacements[2] = {0, sizeof(int)};
MPI_Datatype result_types[2] = {MPI_INT, MPI_BYTE};
MPI_Type_create_struct(2, result_blocklengths, result_displacements, result_types, &ctx->result_datatype);
MPI_Type_commit(&ctx->result_datatype);
}
}
int parallel_job(parallel_context *ctx, const void *global_data, void *node_data, int block)
{
MPI_Status status;
double jobtime;
void *input_and_job_nr = malloc(ctx->input_size + sizeof(int));
void *output_and_job_nr = malloc(ctx->output_size + sizeof(int));
void *input = input_and_job_nr + sizeof(int);
int *job_nr = (int *)input_and_job_nr;
void *output = output_and_job_nr + sizeof(int);
int *output_job_nr = (int *)output_and_job_nr;
int result = 0;
int message_present;
if(block) {
jobtime = -MPI_Wtime();
MPI_Probe(0, MPI_ANY_TAG, MPI_COMM_WORLD,
&status);
jobtime += MPI_Wtime();
INFO("TIMING: Probe() took %f seconds\n", jobtime);
message_present = 1;
} else {
MPI_Iprobe(0, MPI_ANY_TAG, MPI_COMM_WORLD,
&message_present, &status);
}
// DEBUG("Message received: source = %d, tag = %d\n", status.MPI_SOURCE, status.MPI_TAG);
if(message_present) {
if(status.MPI_TAG == PARALLEL_SHUTDOWN) {
DEBUG("Shutting down\n");
result = 1;
} else if(status.MPI_TAG == PARALLEL_ORDER) {
MPI_Recv(input_and_job_nr,
1, ctx->order_datatype,
0, PARALLEL_ORDER, MPI_COMM_WORLD,
&status);
DEBUG("Working on job %d\n", *job_nr);
jobtime = -MPI_Wtime();
// do the actual work
ctx->job(global_data, node_data, input, output);
jobtime += MPI_Wtime();
INFO("TIMING: job %d took %f seconds\n", *job_nr, jobtime);
*output_job_nr = *job_nr;
jobtime = -MPI_Wtime();
MPI_Send(output_and_job_nr,
1, ctx->result_datatype,
0, PARALLEL_RESULT, MPI_COMM_WORLD);
jobtime += MPI_Wtime();
INFO("TIMING: Send() took %f seconds\n", jobtime);
}
} else {
result = 2;
}
free(input_and_job_nr);
free(output_and_job_nr);
return result;
}
int parallel_work(parallel_context *ctx)
{
// do nothing in non-mpi mode
if(ctx->mpi_mode == 0)
return 0;
void *global_data = malloc(ctx->global_data_size);
void *node_data = malloc(ctx->node_data_size);
// wait for global data
MPI_Bcast(global_data, ctx->global_data_size, MPI_BYTE, 0, MPI_COMM_WORLD);
DEBUG("Global data received\n");
// initialize node_data (and do once-per-node computation)
ctx->init(global_data, node_data);
DEBUG("Initialization completed\n");
int shutdown = 0;
while(shutdown != 1) {
shutdown = parallel_job(ctx, global_data, node_data, 1);
}
ctx->destroy(global_data, node_data);
free(global_data);
free(node_data);
return 0;
}
int parallel_run(parallel_context *ctx, const void *global_data, const void *input_array, void *output_array, unsigned int njobs, const char *_restart_filename)
{
// in non-mpi-mode, just run init, forall(jobs) job
if(ctx->mpi_mode == 0) {
void *node_data = malloc(ctx->node_data_size);
int result = ctx->init(global_data, node_data);
if(result != 0)
goto cleanup_standalone;
for(int i = 0; i < njobs; i++) {
result = ctx->job(
global_data,
node_data,
input_array + ctx->input_size*i,
output_array + ctx->output_size*i);
if(result != 0)
goto cleanup_standalone;
}
cleanup_standalone:
ctx->destroy(global_data, node_data);
return result;
} else {
// if no restart file was specified, pick a filename
char *restart_filename;
char buffer[128];
int restartf;
if(_restart_filename == NULL) {
time_t t = time(NULL);
struct tm *loctm = localtime(&t);
strftime(buffer, sizeof(buffer), "restart/restart_%y%m%d_%H%M%S", loctm);
restart_filename = buffer;
} else {
restart_filename = (char *)_restart_filename;
}
// open restart file if it exists, otherwise create it
int continuing = 1;
restartf = open(restart_filename, O_RDWR);
if(restartf == -1 && errno == ENOENT) {
restartf = open(restart_filename, O_RDWR | O_CREAT, 0666);
continuing = 0;
}
if(restartf == -1) {
DEBUG("Error opening restart file: %s\n", strerror(errno));
exit(1);
}
// map restart file
int itemsize = (ctx->output_size + sizeof(int)); // for every job, store output, and completed flag
ftruncate(restartf, njobs*itemsize);
void *alljobs = mmap(0, njobs*itemsize, PROT_READ | PROT_WRITE, MAP_SHARED, restartf, 0);
if(alljobs == MAP_FAILED) {
DEBUG("Error mapping restart file: %s\n", strerror(errno));
exit(1);
}
// count completed jobs, or initialize jobs
int completed = 0;
if(continuing) {
for(int i = 0; i < njobs; i++)
if(DONE(alljobs + i*itemsize))
completed++;
} else {
for(int i = 0; i < njobs; i++) {
DONE(alljobs + i*itemsize) = 0;
memcpy(alljobs + i*itemsize + sizeof(int), input_array + i*ctx->input_size, ctx->input_size); // copy input data
}
}
fsync(restartf);
if(continuing) {
INFO("Continuing from restart file, %d/%d jobs completed, %d nodes\n", completed, njobs, ctx->size);
} else {
INFO("Starting from scratch, %d jobs, %d nodes\n", njobs, ctx->size);
}
if(completed >= njobs)
goto cleanup_mpi;
/* Send global data */
MPI_Bcast((void*)global_data, ctx->global_data_size, MPI_BYTE, 0, MPI_COMM_WORLD);
DEBUG("Global data sent\n");
// we want to be able to run jobs ourselves, so initialize node_data
void *node_data = malloc(ctx->node_data_size);
ctx->init(global_data, node_data);
void *input_message_buffer = malloc(ctx->input_size + sizeof(int));
void *output_message_buffer = malloc(ctx->output_size + sizeof(int));
int *active_jobs = malloc(sizeof(int)*ctx->size);
memset(active_jobs, 0, ctx->size*sizeof(int));
int active_worker_nodes = ctx->size - 1; // we don't count ourselves, since we can't shut ourselves down
// find next unfinished job
int current = 0;
while(current < njobs && DONE(alljobs + current*itemsize))
current++;
// assign initial jobs, 2 for each worker thread
for(int i = 0; i < 2*ctx->size; i++) {
if(current >= njobs) // all jobs are assigned
break;
// send job id and input data
// send to all nodes except ourself (node 0)
*((int*)input_message_buffer) = current;
memcpy(input_message_buffer + sizeof(int), input_array + current*ctx->input_size, ctx->input_size);
MPI_Send(input_message_buffer, 1, ctx->order_datatype,
i%ctx->size, PARALLEL_ORDER, MPI_COMM_WORLD);
DEBUG("Job %d sent to node %d\n", current, i%ctx->size);
active_jobs[i%ctx->size]++;
current++;
}
MPI_Status status;
int message_present;
while(active_jobs[0] != 0 || active_worker_nodes != 0) {
MPI_Iprobe(MPI_ANY_SOURCE, PARALLEL_RESULT, MPI_COMM_WORLD, &message_present, &status);
DEBUG("Message present, tag = %d, source = %d\n", status.MPI_TAG, status.MPI_SOURCE);
if(!message_present) {
// if there are no more messages to process, we can run a job ourselves before returning to managing
DEBUG("Start running job myself\n");
int result = parallel_job(ctx, global_data, node_data, 0);
DEBUG("Finished running job myself, result = %d\n");
} else if(status.MPI_TAG == PARALLEL_RESULT) {
MPI_Recv(output_message_buffer, 1, ctx->result_datatype,
MPI_ANY_SOURCE, PARALLEL_RESULT, MPI_COMM_WORLD, &status);
DEBUG("Got message tag %d from node %d\n", status.MPI_TAG, status.MPI_SOURCE);
int node = status.MPI_SOURCE;
int id = *((int*)output_message_buffer);
memcpy(alljobs + id*itemsize + sizeof(int), output_message_buffer + sizeof(int), ctx->output_size);
DONE(alljobs + id*itemsize) = 1;
active_jobs[node]--;
// todo: deal with unresponsive nodes
// strategy: when no jobs left, go through unfinished list again, incrementing oversubscribe counter
// if oversubscribe counter is at limit, shut node down instead
//
if(current >= njobs) { // all jobs are assigned, try to shut down node
// don't try to shut down ourselves, and only if it has no other jobs to do
if(node != 0 && active_jobs[node] == 0) {
MPI_Send(NULL, 0, MPI_BYTE, node, PARALLEL_SHUTDOWN, MPI_COMM_WORLD);
active_worker_nodes--;
INFO("job %d completed by node %d, shut down, %d workers remaining\n", id, node, active_worker_nodes);
}
} else {
*((int*)input_message_buffer) = current;
memcpy(input_message_buffer + sizeof(int), input_array + current*ctx->input_size, ctx->input_size);
MPI_Send(input_message_buffer, 1, ctx->order_datatype,
node, PARALLEL_ORDER, MPI_COMM_WORLD);
active_jobs[node]++;
current++;
if(active_jobs[node] < 3) {
*((int*)input_message_buffer) = current;
memcpy(input_message_buffer + sizeof(int), input_array + current*ctx->input_size, ctx->input_size);
MPI_Send(input_message_buffer, 1, ctx->order_datatype,
node, PARALLEL_ORDER, MPI_COMM_WORLD);
active_jobs[node]++;
current++;
INFO("job %d completed by node %d, continues with %d and %d\n", id, node, current-1, current-2);
} else {
INFO("job %d completed by node %d, continues with %d\n", id, node, current-1);
}
}
}
}
for(int i = 0; i < njobs; i++) {
memcpy(output_array + i*ctx->output_size, alljobs + i*itemsize + sizeof(int), ctx->output_size);
}
free(input_message_buffer);
free(output_message_buffer);
free(node_data);
free(active_jobs);
cleanup_mpi:
munmap(alljobs, njobs*itemsize);
close(restartf);
}
return 0;
}

View File

@ -1,118 +0,0 @@
#ifndef PARALLEL_H
#define PARALLEL_H
/*
this is a library to parallelize workloads which can be split up naturally
into a sequence of independent jobs, using MPI. A program will usually
- do precomputation
- fill array with input data
- do the parallel work
- print the output data
we want to enable restarts, so that only unfinshed jobs need to be repeated.
Further, we want to be resilient to slow/unreliable network and to losing
nodes. There is a main node and a number of workers. The main node does the
precomputation and then retires do do administrative work, and the workers
do the actual jobs. We also want to switch to serial mode if the program is
called without MPI.
The following data has to be transimitted between nodes:
- results of the precomputation (read-only, shared between nodes)
- job-specific input data, generated by main node before parallel part
- output data for each job
the parallel work shall be given as a callback function which takes
input data and precomputation data as parameter
the above program will look like this for us:
- parallel_init
- if we are a worker, do parallel_work(init_callback, job_callback), exit
- do precomputation
- fill array with input data
- output_array = parallel_run(input_array)
- print the output data
parallel_init:
- check if we're running as an mpi program
- init mpi, check what kind of node we are
parallel_work(init_callback1, init_callback2, job_callback):
- receive global_precomp (???)
- worker_precomp = init_callback2(global_precomp, worker_precomp)
- infinite loop:
- wait for job on network, receive input
- output = job_callback(global_precomp, worker_precomp, input)
- send output on network
- exit loop on shutdown signal
parallel_run(global_precomp, input_array, restart file, callbacks):
- check if we're running as an MPI program
- send global_precomp to all nodes (if MPI)
- if(restart file given and exists) read restart file
- else create new restart file
- until(all jobs finished):
- if MPI:
- send next job & input to appropriate node
- if all jobs are in work, reassign unfinished ones (up to limit)
- collect outputs
- if no MPI:
- worker_precomp = init_callback1
- worker_precomp = init_callback2(global_precomp, worker_precomp)
- for(j in jobs)
- output(j) = job_callback(global_precomp, worker_precomp, input(j))
- delete restart file
- return array of outputs
parallel_destroy():
- free everything
have a context? probably yes: parallel_context
plan:
- make interface
- implement no-MPI part
- restructure singular_values.c to use interface
- implement MPI part
*/
#include <mpi.h>
#include <time.h>
typedef void (*parallel_callback_destroy)(const void*, void*);
typedef int (*parallel_callback_init)(const void*,void*);
typedef int (*parallel_callback_job)(const void*,void*,const void*,void*);
typedef struct {
int mpi_mode;
struct timespec starttime;
char processor_name[MPI_MAX_PROCESSOR_NAME];
int processor_name_len;
int rank;
int size;
MPI_Datatype order_datatype;
MPI_Datatype result_datatype;
parallel_callback_init init;
parallel_callback_job job;
parallel_callback_destroy destroy;
void *global_data;
void *node_data;
int global_data_size;
int node_data_size;
int input_size;
int output_size;
} parallel_context;
parallel_context *parallel_init();
void parallel_set_datasize_and_callbacks(parallel_context *ctx, parallel_callback_init init, parallel_callback_job job, parallel_callback_destroy destroy, int global_data_size, int node_data_size, int input_size, int output_size);
int parallel_work(parallel_context *ctx);
int parallel_run(parallel_context *ctx, const void *global_data, const void *input_array, void *output_array, unsigned int njobs, const char *restart_filename);
void parallel_destroy(parallel_context* ctx);
int mpi_rank();
void start_timer();
double runtime();
#endif

View File

@ -1,9 +0,0 @@
#!/bin/bash
# nmax=895882 # up to reflection group word length 22 ( 555 group)
nmax=700000 # up to reflection group word length 22 ( 444 group)
# nmax=11575 # up to reflection group word length 14
#time mpirun --mca opal_warn_on_missing_libcuda 0 -x LD_LIBRARY_PATH=/home/stecker/svmpi/libs ./singular_values $nmax ejp_trg_restart test.out
time mpirun --mca opal_warn_on_missing_libcuda 0 --mca mpi_yield_when_idle 1 -np 4 ./singular_values 700000 4 4 4 1 1 100 1 100 100 $1

View File

@ -1,14 +0,0 @@
#!/bin/bash
cd /home/stecker/svmpi/
nmax=895882 # up to reflection group word length 22
# nmax=11575 # up to reflection group word length 14
outfile=result_$(date +%Y%m%d_%H%M%S).out
unset DISPLAY
make singular_values &&
time mpirun -n 100 -x LD_LIBRARY_PATH=/home/stecker/svmpi/libs --hostfile hostfile_big ./singular_values $nmax utexas_cluster_restart $outfile

57
parallelization/runjobs.py Executable file
View File

@ -0,0 +1,57 @@
#!/usr/bin/python
from mpi4py import MPI
import os
import re
import math
import subprocess
import time
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
nodes = comm.Get_size()
# print(os.path.abspath(os.curdir))
done = set()
for f in os.listdir('.'):
if re.search('^done_[0-9]+', f):
fp = open(f, "r")
for x in fp:
done.add(int(x))
fp.close()
f = open("commands", "r")
idx = 0
todo = []
for c in f:
if not idx in done:
todo.append((idx,c))
idx = idx+1
f.close()
start = math.floor(len(todo)/nodes*rank)
end = math.floor(len(todo)/nodes*(rank+1))
if(rank == nodes-1):
end = len(todo)
print("{n:d} commands awaiting execution, {nnode:d} of them in node {rank:d}".format(n=len(todo),nnode=end-start,rank=rank))
time.sleep(1) # to make sure all nodes read the status first before more gets done
outfilename = "result_{node:003d}".format(node=rank)
donefilename = "done_{node:003d}".format(node=rank)
outfile = open(outfilename, "a")
donefile = open(donefilename, "a")
for i in range(start, end):
result = subprocess.call(todo[i][1], stdout=outfile, shell=True)
if result == 0:
donefile.write(str(todo[i][0]) + '\n')
else:
print("Command failed: {cmd}".format(cmd=todo[i][1]))
outfile.flush()
donefile.flush()
outfile.close()
donefile.close()

View File

@ -1,20 +0,0 @@
#!/bin/bash
#SBATCH -J ejp_trg
#SBATCH -o logs/ejp_trg.o%j
#SBATCH -e logs/ejp_trg.e%j
#SBATCH -p skx-dev
#SBATCH -N 1
#SBATCH -n 48
#SBATCH -t 00:05:00
#SBATCH --mail-user=mail@florianstecker.net
#SBATCH --mail-type=all
export LD_LIBRARY_PATH=$WORK/mps/lib:$LD_LIBRARY_PATH
d=$(date +%Y%m%d_%H%M%S)
nmax=895882 # up to reflection group word length 22
# nmax=11575 # up to reflection group word length 1
ibrun ./singular_values $nmax $SCRATCH/ejp_trg_restart $WORK/ejp_trg/output/result_$d

View File

@ -1,8 +0,0 @@
#!/bin/bash
rsync -vt *.c *.h Makefile stampede.slurm stampede:work/ejp_trg/
#rsync -lvt /usr/lib/libmps.so* /usr/include/mps utexas:work/ejp_trg/libs/
# now run it with a job script
# get MPSolve from https://numpi.dm.unipi.it/_media/software/mpsolve/mpsolve-3.2.1.tar.bz2

View File

@ -1,7 +0,0 @@
#!/bin/bash
rsync -vt *.c *.h Makefile hostfile hostfile_big allhosts localnames run_utexas run_local utexas:svmpi/
rsync -lvt /usr/lib/libmps.so* utexas:svmpi/libs/
rsync -rvt /usr/include/mps utexas:svmpi/libs/
# now run it with ssh utexas -t ssh linux50 svmpi/run_utexas