10 #define _XOPEN_SOURCE 700
23 #include "ompi_partest.h"
24 #include "partest_opts.h"
31 #include <common/bgp_personality.h>
32 #include <common/bgp_personality_inlines.h>
53 int barrier_after_start(MPI_Comm comm)
63 int barrier_after_malloc(MPI_Comm comm)
74 int barrier_after_open(MPI_Comm comm)
84 int barrier_after_write(MPI_Comm comm)
94 int barrier_after_read(MPI_Comm comm)
104 int barrier_after_close(MPI_Comm comm)
114 int barrier_before_unlink(MPI_Comm comm)
124 int barrier_after_unlink(MPI_Comm comm)
134 static char * __collective_print_pointer;
136 int collective_print_gather(
char *cbuffer, MPI_Comm comm )
140 int num_threads = omp_get_num_threads();
146 __collective_print_pointer = (
char *) malloc(MAXCHARLEN * num_threads);
151 memcpy(__collective_print_pointer+(MAXCHARLEN * omp_get_thread_num()),cbuffer,MAXCHARLEN);
157 MPI_Comm_size(comm, &size);
158 MPI_Comm_rank(comm, &rank);
159 if(rank==0) lbuffer = (
char *) malloc(MAXCHARLEN * num_threads * size);
162 MPI_Gather(__collective_print_pointer, MAXCHARLEN*num_threads, MPI_CHAR, lbuffer, MAXCHARLEN*num_threads, MPI_CHAR, 0, comm);
167 for (p = 0; p < (size*num_threads); p++) {
169 fprintf(stderr,
"%s", lbuffer + p * MAXCHARLEN);
173 if(rank==0) free(lbuffer);
175 free(__collective_print_pointer);
192 static void * __thread_sync_pointer;
194 void reduce_omp(
void *syncdata,
void * out, MPI_Op op,
int dtype)
196 int thread_num = omp_get_thread_num();
197 int num_threads = omp_get_num_threads();
204 case _PARTEST_SION_INT32:
206 __thread_sync_pointer = malloc(
sizeof(sion_int32)*num_threads);
209 case _PARTEST_SION_INT64:
211 __thread_sync_pointer = malloc(
sizeof(sion_int64)*num_threads);
214 case _PARTEST_DOUBLE:
216 __thread_sync_pointer = malloc(
sizeof(
double)*num_threads);
221 __thread_sync_pointer = malloc(
sizeof(sion_int64)*num_threads);
230 case _PARTEST_SION_INT32:
232 ((sion_int32 *)__thread_sync_pointer)[thread_num] = * (sion_int32*)syncdata;
235 case _PARTEST_SION_INT64:
237 ((sion_int64 *)__thread_sync_pointer)[thread_num] = *((sion_int64*)syncdata);
240 case _PARTEST_DOUBLE:
242 ((
double *)__thread_sync_pointer)[thread_num] = *(
double*)syncdata;
247 ((sion_int64 *)__thread_sync_pointer)[thread_num] = *(sion_int64*)syncdata;
259 case _PARTEST_SION_INT32:
262 *((sion_int32 *) out) = 0;
263 for(i=0;i<num_threads;i++){
264 *((sion_int32 *) out) += ((sion_int32 *)__thread_sync_pointer)[i];
266 }
else if(op == MPI_MAX){
267 *((sion_int32 *) out) = ((sion_int32 *)__thread_sync_pointer)[0];
268 for(i=1;i<num_threads;i++){
269 if(((sion_int32 *)__thread_sync_pointer)[i] > *((sion_int32 *) out)) *((sion_int32 *) out) = ((sion_int32 *)__thread_sync_pointer)[i];
274 case _PARTEST_SION_INT64:
277 *((sion_int64 *) out) = 0;
278 for(i=0;i<num_threads;i++){
279 *((sion_int64 *) out) += ((sion_int64 *)__thread_sync_pointer)[i];
281 }
else if(op == MPI_MAX){
282 *((sion_int64 *) out) = ((sion_int64 *)__thread_sync_pointer)[0];
283 for(i=1;i<num_threads;i++){
284 if(((sion_int64 *)__thread_sync_pointer)[i] > *((sion_int64 *) out)) *((sion_int64 *) out) = ((sion_int64 *)__thread_sync_pointer)[i];
289 case _PARTEST_DOUBLE:
292 *((
double *) out) = 0;
293 for(i=0;i<num_threads;i++){
294 *((
double *) out) += ((
double *)__thread_sync_pointer)[i];
296 }
else if(op == MPI_MAX){
297 *((
double *) out) = ((
double *)__thread_sync_pointer)[0];
298 for(i=1;i<num_threads;i++){
299 if(((
double *)__thread_sync_pointer)[i] > *((
double *) out)) *((
double *) out) = ((
double *)__thread_sync_pointer)[i];
307 *((sion_int64 *) out) = 0;
308 for(i=0;i<num_threads;i++){
309 *((sion_int64 *) out) += ((sion_int64 *)__thread_sync_pointer)[i];
311 }
else if(op == MPI_MAX){
312 *((sion_int64 *) out) = ((sion_int64 *)__thread_sync_pointer)[0];
313 for(i=1;i<num_threads;i++){
314 if(((sion_int64 *)__thread_sync_pointer)[i] > *((sion_int64 *) out)) *((sion_int64 *) out) = ((sion_int64 *)__thread_sync_pointer)[i];
320 free(__thread_sync_pointer);
325 int split_communicator(
_test_communicators * communicators,
int bluegene,
int bluegene_np,
int numfiles,
int read_task_offset,
int verbose);
328 int main(
int argc,
char **argv)
330 int rank, size, rc = 0;
333 sion_int64 commwork_size64 = 1;
341 MPI_Init(&argc, &argv);
342 MPI_Comm_size(MPI_COMM_WORLD, &size);
343 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
344 DPRINTFTS(rank,
"after MPI_Init");
350 init_options(&options);
354 parse_options_std(argc, argv, &options);
359 rc=parse_options_long(argc, argv, &options);
365 MPI_Bcast(&rc, 1, MPI_INT, 0, MPI_COMM_WORLD);
367 MPI_Abort(MPI_COMM_WORLD, 1);
370 distribute_options_mpi(&options);
374 communicators.
all = MPI_COMM_WORLD;
375 MPI_Comm_size(MPI_COMM_WORLD, &communicators.all_size);
376 MPI_Comm_rank(MPI_COMM_WORLD, &communicators.all_rank);
377 split_communicator(&communicators, options.bluegene, options.bluegene_np, options.numfiles, options.read_task_offset, options.verbose);
380 sion_int64 num_threads = (sion_int64) omp_get_max_threads();
381 MPI_Allreduce(&num_threads, &commwork_size64, 1, SION_MPI_INT64, MPI_SUM, communicators.
work);
383 if (options.globalsize > 0) {
384 options.totalsize = (sion_int64) options.globalsize / commwork_size64;
387 options.globalsize = options.totalsize * commwork_size64;
390 if((options.totalsize>options.bufsize) || (options.read_task_offset>0) || (options.do_write==0)) {
391 options.suppress_checksum=1;
394 if(options.fsblksize<0) options.fsblksize=-1;
396 if ( (communicators.work_size>0) && (communicators.work_rank==0) ) {
398 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
399 fprintf(stderr,
"SION parallel file I/O benchmark 'ompi_partest': start at %s", ctime(&t));
400 fprintf(stderr,
"partest Number of MPI tasks that will use the file tasks: running on %d tasks\n", size);
401 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
403 fprintf(stderr,
"partest parameter: CHECKSUM DISABLED!\n\n");
405 if(options.suppress_checksum) {
406 fprintf(stderr,
"partest parameter: CHECKSUM not possible, DISABLED!\n\n");
409 fprintf(stderr,
"partest parameter: (-f) datafile = %s\n", options.filename);
410 fprintf(stderr,
"partest parameter: (-n) number of files = %d\n", options.numfiles);
411 fprintf(stderr,
"partest parameter: (-F) random factor = %13.4f\n", options.factor);
412 fprintf(stderr,
"partest parameter: (-X) remove files after test = %d\n", options.unlink_files);
413 fprintf(stderr,
"partest parameter: (-b/-B) local buffer size / sion task = %15lld bytes %10.3f MB\n", options.bufsize, options.bufsize / (1.0 MB));
414 fprintf(stderr,
"partest parameter: (-g/-G) global total data size = %15lld bytes %10.3f GB\n", options.globalsize, options.globalsize / (1024.0 MB));
415 fprintf(stderr,
"partest parameter: (-s/-S) total data size / sion task = %15lld bytes %10.3f MB\n", options.totalsize, options.totalsize / (1.0 MB));
416 fprintf(stderr,
"partest parameter: (-r/-R) sion chunk size = %15lld bytes %10.3f MB\n", options.chunksize, options.chunksize / (1.0 MB));
417 fprintf(stderr,
"partest parameter: (-Q) fs block size = %15d bytes %10.3f MB\n", options.fsblksize, options.fsblksize / (1.0 MB));
418 if (options.type == 0)
419 fprintf(stderr,
"partest parameter: (-T) test type = %d (sion OMPI, collective read)\n", options.type);
420 if (options.type == 1)
421 fprintf(stderr,
"partest parameter: (-T) test type = %d (sion OMPI, independant read)\n", options.type);
422 if (options.type == 2)
423 fprintf(stderr,
"partest parameter: (-T) test type = %d (sion OMP, collective read)\n", options.type);
424 if (options.type == 3)
425 fprintf(stderr,
"partest parameter: (-T) test type = %d (sion OMP, independant read)\n", options.type);
426 fprintf(stderr,
"partest parameter: (-j) serialize_blocknum = %d\n", options.serialize_blocknum);
427 fprintf(stderr,
"partest parameter: (-Z) read task offset = %d\n", options.read_task_offset);
428 fprintf(stderr,
"partest parameter: (-o) start offset bytes = %d\n", options.startoffset);
429 fprintf(stderr,
"partest parameter: (-v) verbose = %d\n", options.verbose);
430 fprintf(stderr,
"partest parameter: (-d) debug = %d\n", options.debug);
431 fprintf(stderr,
"partest parameter: (-D) Debug = %d\n", options.Debug);
432 fprintf(stderr,
"partest parameter: (-M) collective write = %d\n", options.collectivewrite);
433 fprintf(stderr,
"partest parameter: (-m) collective read = %d\n", options.collectiveread);
436 fprintf(stderr,
"partest parameter: (-P) Blue Gene, I/O nodes = %d\n", options.bluegene);
437 fprintf(stderr,
"partest parameter: () Blue Gene: tasks/IO-node = %d\n", options.bluegene_np);
438 fprintf(stderr,
"partest parameter: (-w) MPI-IO, IBM, Large Block IO = %d\n", options.mpiio_lb);
439 fprintf(stderr,
"partest parameter: (-W) MPI-IO, IBM, IO bufsize = %d KB\n", options.mpiio_bs);
440 fprintf(stderr,
"partest parameter: (-x) MPI-IO, IBM, sparse access = %d\n", options.mpiio_sa);
441 fprintf(stderr,
"partest parameter: ( ) OpenMP number of threads = %d\n", omp_get_max_threads());
442 fprintf(stderr,
"partest parameter: ( ) commwork_size64 = %lld\n", commwork_size64);
443 fprintf(stderr,
"partest parameter: ( ) suppress_checksum = %d\n", options.suppress_checksum);
444 fprintf(stderr,
"partest parameter: ( ) do_write = %d\n", options.do_write);
445 fprintf(stderr,
"partest parameter: ( ) do_read = %d\n", options.do_read);
446 fprintf(stderr,
"partest parameter: ( ) use_posix = %d\n", options.use_posix);
450 barrier_after_start(MPI_COMM_WORLD);
452 if ( (communicators.work_size>0) && (communicators.work_rank==0) ) {
453 fprintf(stderr,
"partest parameter: ( ) comm(all) = %d of %d\n", communicators.all_rank, communicators.all_size);
454 fprintf(stderr,
"partest parameter: ( ) comm(work) = %d of %d\n", communicators.work_rank, communicators.work_size);
455 fprintf(stderr,
"partest parameter: ( ) comm(local) = %d of %d\n", communicators.local_rank, communicators.local_size);
456 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
458 #pragma omp parallel private(localbuffer,t)
460 barrier_after_start(MPI_COMM_WORLD);
461 char l_filename[MAXCHARLEN];
462 strcpy(l_filename,options.filename);
466 DPRINTFTS(rank,
"after pstart");
468 localbuffer = (
char *) malloc(options.bufsize);
470 srand(time(NULL)*local_communicators.work_rank*omp_get_thread_num());
475 memset (localbuffer,
'a'+rank%26, local_options.bufsize);
476 barrier_after_malloc(MPI_COMM_WORLD);
477 DPRINTFTS(rank,
"after malloc");
481 if(local_options.factor>0.0) {
482 if((local_options.collectivewrite) || (local_options.collectiveread)) {
483 if(local_options.bufsize<local_options.totalsize*(1+local_options.factor)) {
486 fprintf(stderr,
"partest: ERROR deadlock possible if collective read/write and random factor used, and buffer is too small aborting\n");
487 MPI_Abort(MPI_COMM_WORLD,0);
494 local_options.totalsize += ((sion_int64) (local_options.factor * (sion_int64) local_options.totalsize * (sion_int64) rand() / (sion_int64) RAND_MAX));
495 local_options.chunksize += ((sion_int64) (local_options.factor * (sion_int64) local_options.totalsize * (sion_int64) rand() / (sion_int64) RAND_MAX));
496 fprintf(stderr,
"partest parameter: ( ) new totalsize[%4d,t%4d] = %lld\n", local_communicators.work_rank,omp_get_thread_num(),local_options.totalsize);
501 DPRINTFTS(rank,
"before scall2");
504 if ( (local_communicators.work_size>0) && (local_communicators.work_rank==0) ) {
505 fprintf(stderr,
"partest parameter: ( ) new totalsize = %lld\n", local_options.totalsize);
509 barrier_after_malloc(MPI_COMM_WORLD);
510 if (local_options.type == 0) {
511 local_options.collectiveopenforread = 1;
512 test_paropen_multi_ompi(l_filename, localbuffer, &local_communicators, &local_options);
514 }
else if(local_options.type == 1) {
515 local_options.collectiveopenforread = 0;
516 test_paropen_multi_ompi(l_filename, localbuffer, &local_communicators, &local_options);
517 }
else if(local_options.type == 2) {
518 local_options.collectiveopenforread = 1;
519 test_paropen_omp(l_filename, localbuffer, &local_communicators, &local_options);
520 }
else if(local_options.type == 3) {
521 local_options.collectiveopenforread = 0;
522 test_paropen_omp(l_filename, localbuffer, &local_communicators, &local_options);
525 DPRINTFTS(rank,
"before MPI_Finalize");
526 barrier_after_malloc(MPI_COMM_WORLD);
529 if ( (local_communicators.work_size>0) && (local_communicators.work_rank==0) ) {
531 fprintf(stderr,
"SION parallel file I/O benchmark 'ompi_partest': end at %s\n", ctime(&t));
542 int split_communicator(
_test_communicators * communicators,
int bluegene,
int bluegene_np,
int numfiles,
int read_task_offset,
int verbose)
546 communicators->work_size = communicators->work_rank = -2;
547 communicators->local_size = communicators->local_rank = -2;
553 _BGP_Personality_t personality;
554 MPI_Comm commSame, commDiff;
555 int sizeSame, sizeDiff;
556 int rankSame, rankDiff;
558 unsigned procid, x, y, z, t;
559 char cbuffer[MAXCHARLEN];
562 Kernel_GetPersonality(&personality,
sizeof(personality));
563 BGP_Personality_getLocationString(&personality, location);
564 procid = Kernel_PhysicalProcessorID();
565 MPIX_rank2torus(communicators->all_rank, &x, &y, &z, &t);
568 MPIX_Pset_diff_comm_create(&commDiff);
569 MPI_Comm_size(commDiff, &sizeDiff);
570 MPI_Comm_rank(commDiff, &rankDiff);
571 communicators->ionode_number = rankDiff;
574 MPIX_Pset_same_comm_create(&commSame);
575 MPI_Comm_size(commSame, &sizeSame);
576 MPI_Comm_rank(commSame, &rankSame);
579 if (bluegene_np == 0) {
580 bluegene_np = sizeSame;
584 MPI_Comm_split(communicators->
all, (rankSame < bluegene_np), communicators->all_rank, &communicators->
work);
585 MPI_Comm_size(communicators->
work, &communicators->work_size);
586 MPI_Comm_rank(communicators->
work, &communicators->work_rank);
587 if (rankSame >= bluegene_np) {
589 communicators->work_size = communicators->work_rank = -1;
590 communicators->local_size = communicators->local_rank = -1;
596 communicators->
local = communicators->
work;
598 else if(numfiles<0) {
601 MPI_Comm_split(commSame, (rankSame < bluegene_np), communicators->all_rank, &communicators->
local);
605 communicators->
local=commDiff;
608 MPI_Comm_size(communicators->
local, &communicators->local_size);
609 MPI_Comm_rank(communicators->
local, &communicators->local_rank);
614 if(numfiles==-1) communicators->file_number = rankDiff;
615 else communicators->file_number = rankSame;
618 communicators->file_number = -1;
622 sprintf(cbuffer,
"");
623 if (rankSame < bluegene_np) {
625 sprintf(cbuffer,
"BGP[%05d] diff_comm: %4d of %4d same_comm: %5d of %5d file_comm: %5d of %5d %s phys_xyzt(%ud,%ud,%ud,%ud)\n",
626 communicators->all_rank, rankDiff + 1, sizeDiff, rankSame + 1, sizeSame, communicators->local_rank + 1, communicators->local_size,
627 location, x, y, z, t);
630 collective_print_gather(cbuffer, communicators->
work);
640 if (communicators->work_size == -2) {
642 communicators->
work = communicators->
all;
643 MPI_Comm_size(communicators->
work, &communicators->work_size);
644 MPI_Comm_rank(communicators->
work, &communicators->work_rank);
647 if (communicators->local_size == -2) {
649 communicators->
local = communicators->
work;
653 numfiles = communicators->work_size / 2;
657 proc_per_file = communicators->work_size / numfiles;
660 if (communicators->work_rank >= (numfiles * proc_per_file)) {
661 communicators->file_number = numfiles - 1;
664 communicators->file_number = communicators->work_rank / proc_per_file;
667 MPI_Comm_split(communicators->
work, communicators->file_number, communicators->all_rank, &communicators->
local);
669 MPI_Comm_size(communicators->
local, &communicators->local_size);
670 MPI_Comm_rank(communicators->
local, &communicators->local_rank);
672 communicators->ionode_number = communicators->file_number;
679 gethostname(location, 256);
680 char cbuffer[MAXCHARLEN];
681 sprintf(cbuffer,
"LINUX[%03d] diff_comm: %4d of %4d same_comm: %4d of %4d file_comm: %4d of %4d %s\n",
682 communicators->all_rank, communicators->all_rank, communicators->all_size,
683 communicators->work_rank, communicators->work_size, communicators->local_rank, communicators->local_size, location);
684 collective_print_gather(cbuffer, communicators->
all);
692 gethostname(location, 256);
693 int sizeSame = 0, sizeDiff = 0;
694 char cbuffer[MAXCHARLEN];
695 sprintf(cbuffer,
"AIX[%03d] diff_comm: %4d of %4d same_comm: %4d of %4d file_comm: %4d of %4d %s\n",
696 communicators->all_rank, communicators->all_rank, communicators->all_size,
697 communicators->work_rank, communicators->work_size, communicators->local_rank, communicators->local_size, location);
698 collective_print_gather(cbuffer, communicators->
all);
704 if (communicators->work_size != -1) {
707 newtasknr=(communicators->work_rank+read_task_offset)%communicators->work_size;
708 MPI_Comm_split(communicators->
work, 0, newtasknr, &communicators->
workread);
710 MPI_Comm_size(communicators->
workread, &communicators->workread_size);
711 MPI_Comm_rank(communicators->
workread, &communicators->workread_rank);
719 communicators->workread_size = communicators->workread_rank = -1;
720 communicators->local_size = communicators->local_rank = -1;