10 #define _XOPEN_SOURCE 700 26 #include "partest_util.h" 29 test_single_mpi(
char *filename,
38 double starttime, write_starttime, read_starttime, unlinktime, gunlinktime;
39 double timings[TIMINGS_MAX_NUM],ftimings[TIMINGS_MAX_NUM],gtimings[TIMINGS_MAX_NUM];
40 sion_int64 stats[STATS_MAX_NUM], fstats[STATS_MAX_NUM], gstats[STATS_MAX_NUM];
43 sion_int64 bsumwrote, bsumread;
44 size_t bwrite, bwrote, btoread, bread, chunkcnt;
45 double checksum_fp, checksum_read_fp;
46 int ser_count,ser_step,ser_done;
51 if (communicators->work_size == -1) {
56 if(options->use_posix) {
62 for(i=0;i<TIMINGS_MAX_NUM;i++) timings[i]=ftimings[i]=gtimings[i]=0.0;
63 for(i=0;i<STATS_MAX_NUM;i++) stats[i]=fstats[i]=gstats[i]=0;
65 do_debug=(((options->debug && communicators->all_rank == 0)) || ((options->Debug && (communicators->all_rank+1) == communicators->all_size)));
67 sprintf(fname,
"%s.%06d", filename, communicators->work_rank);
69 if(options->do_write) {
74 barrier_after_start(communicators->
local);
76 write_starttime = starttime = MPI_Wtime();
81 fprintf(stderr,
"cannot open %s for writing, aborting ...\n", fname);
82 MPI_Abort(communicators->
all, 1);
84 stats[STATS_WR_NUM_FILES]=communicators->work_size;
85 timings[TIMINGS_WR_CREATE] = MPI_Wtime()-starttime;
87 starttime = MPI_Wtime();
88 barrier_after_open(communicators->
work);
89 timings[TIMINGS_WR_CREATE_BARR_OPEN] = MPI_Wtime()-starttime;
91 starttime = MPI_Wtime();
93 timings[TIMINGS_WR_CREATE_CLOSE] = MPI_Wtime()-starttime;
95 starttime = MPI_Wtime();
96 barrier_after_open(communicators->
work);
97 timings[TIMINGS_WR_CREATE_BARR_CLOSE] = MPI_Wtime()-starttime;
100 starttime = MPI_Wtime();
104 fprintf(stderr,
"cannot open %s for writing, aborting ...\n", fname);
105 MPI_Abort(communicators->
all, 1);
107 timings[TIMINGS_WR_OPEN] = MPI_Wtime()-starttime;
109 starttime = MPI_Wtime();
110 barrier_after_open(communicators->
work);
111 timings[TIMINGS_WR_OPEN_BARR_GLOBAL] = MPI_Wtime()-starttime;
113 starttime = MPI_Wtime();
114 if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
115 else ser_step=communicators->local_size;
117 timings[TIMINGS_WR_WRITE_SYNC] += MPI_Wtime()-starttime;
120 for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
122 if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
125 starttime = MPI_Wtime();
126 left = options->totalsize;
131 bwrite = options->bufsize;
136 fprintf(stderr,
"timings[%03d] write %lld bytes\n", communicators->all_rank, (sion_int64) bwrite);
141 left -= (sion_int64) bwrote;
142 bsumwrote += (sion_int64) bwrote;
146 if(!options->suppress_checksum) {
148 for (i = 0; i < bwrote; i++)
149 checksum_fp += (
double) localbuffer[i];
154 fprintf(stderr,
"timings[%03d] wrote (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n",
155 communicators->all_rank, (sion_int64) bwrote, bsumwrote, bsumwrote / 1024.0 / 1024.0, (sion_int64) left);
160 timings[TIMINGS_WR_WRITE] += MPI_Wtime()-starttime;
161 stats[STATS_BYTES_WR_WROTE]+=bsumwrote;
162 stats[STATS_BYTES_WR_NUM_CHUNKS]+=chunkcnt;
166 starttime = MPI_Wtime();
167 barrier_after_write(communicators->
local);
168 timings[TIMINGS_WR_WRITE_BARR_FILE] += MPI_Wtime()-starttime;
172 starttime = MPI_Wtime();
173 barrier_after_write(communicators->
work);
174 timings[TIMINGS_WR_WRITE_BARR_GLOBAL] = MPI_Wtime()-starttime;
176 starttime = MPI_Wtime();
178 timings[TIMINGS_WR_CLOSE] = MPI_Wtime()-starttime;
180 starttime = MPI_Wtime();
181 barrier_after_close(communicators->
work);
182 timings[TIMINGS_WR_CLOSE_BARR_GLOBAL] = MPI_Wtime()-starttime;
184 timings[TIMINGS_WR_TOTAL] = MPI_Wtime()-write_starttime;
187 if (options->verbose) {
188 write_timings(
"TASK",TIMINGS_METHOD_WRITE,timings,stats,communicators,options,1);
192 write_timings(
"TASK",TIMINGS_METHOD_WRITE,timings,stats,communicators,options,0);
195 MPI_Reduce(timings, gtimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->
work);
196 MPI_Reduce(stats, gstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->
work);
198 if (communicators->work_rank == 0) {
199 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
200 write_timings(
"TOTAL",TIMINGS_METHOD_WRITE,gtimings,gstats,communicators,options,0);
201 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
207 barrier_after_close(communicators->
work);
209 if(options->do_read) {
212 sprintf(fname,
"%s.%06d", filename, communicators->workread_rank);
217 barrier_after_start(communicators->
local);
219 read_starttime = starttime = MPI_Wtime();
223 fprintf(stderr,
"cannot open %s for writing, aborting ...\n", fname);
224 MPI_Abort(communicators->
all, 1);
226 stats[STATS_RD_NUM_FILES]=communicators->work_size;
227 timings[TIMINGS_RD_OPEN] = MPI_Wtime()-starttime;
229 starttime = MPI_Wtime();
230 barrier_after_open(communicators->
work);
231 timings[TIMINGS_RD_OPEN_BARR_GLOBAL] = MPI_Wtime()-starttime;
234 starttime = MPI_Wtime();
235 if(options->serialize_blocknum>0) ser_step=options->serialize_blocknum;
236 else ser_step=communicators->local_size;
238 timings[TIMINGS_RD_READ_SYNC] += MPI_Wtime()-starttime;
240 for(ser_count=0;ser_count<communicators->local_size;ser_count+=ser_step) {
242 if ((!ser_done) && communicators->local_rank<(ser_count+ser_step)) {
245 starttime = MPI_Wtime();
246 left = options->totalsize;
251 btoread = options->bufsize;
262 if(!options->suppress_checksum) {
263 checksum_read_fp=0.0;
264 for (i = 0; i < bread; i++)
265 checksum_read_fp += (
double) localbuffer[i];
270 fprintf(stderr,
"timings[%03d] read (%lld bytes) %lld bytes (%10.4f MB) (%lld left)\n", communicators->all_rank, (sion_int64) bread, bsumread,
271 bsumread / 1024.0 / 1024.0, (sion_int64) left);
276 timings[TIMINGS_RD_READ] += MPI_Wtime()-starttime;
278 stats[STATS_BYTES_RD_READ]+=bsumread;
279 stats[STATS_BYTES_RD_NUM_CHUNKS]+=chunkcnt;
281 starttime = MPI_Wtime();
282 barrier_after_read(communicators->
local);
283 timings[TIMINGS_RD_READ_BARR_FILE] += MPI_Wtime()-starttime;
289 starttime = MPI_Wtime();
290 barrier_after_read(communicators->
workread);
291 timings[TIMINGS_RD_READ_BARR_GLOBAL] = MPI_Wtime()-starttime;
293 starttime = MPI_Wtime();
295 timings[TIMINGS_RD_CLOSE] = MPI_Wtime()-starttime;
297 starttime = MPI_Wtime();
298 barrier_after_close(communicators->
workread);
299 timings[TIMINGS_RD_CLOSE_BARR_GLOBAL] = MPI_Wtime()-starttime;
300 timings[TIMINGS_RD_TOTAL] = MPI_Wtime()-read_starttime;
302 starttime = MPI_Wtime();
304 if (timings[TIMINGS_RD_TOTAL] == 0) timings[TIMINGS_RD_TOTAL] = -1;
306 if (options->verbose) {
307 write_timings(
"TASK",TIMINGS_METHOD_READ,timings,stats,communicators,options,1);
311 write_timings(
"TASK",TIMINGS_METHOD_READ,timings,stats,communicators,options,0);
314 timings[TIMINGS_MSGS] += MPI_Wtime()-starttime;
315 if (options->numfiles > 1) {
317 MPI_Reduce(timings, ftimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->
local);
318 MPI_Reduce(stats, fstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->
local);
320 if (communicators->local_rank == 0) {
321 write_timings(
"FILE",TIMINGS_METHOD_READ,ftimings,fstats,communicators,options,0);
326 MPI_Reduce(timings, gtimings, TIMINGS_MAX_NUM, MPI_DOUBLE, MPI_MAX, 0, communicators->
workread);
327 MPI_Reduce(stats, gstats, STATS_MAX_NUM, SION_MPI_INT64, MPI_SUM, 0, communicators->
workread);
329 DPRINTFTS(communicators->all_rank,
"after red.");
330 if (communicators->workread_rank == 0) {
331 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
332 write_timings(
"TOTAL",TIMINGS_METHOD_READ,gtimings,gstats,communicators,options,0);
333 fprintf(stderr,
"------------------------------------------------------------------------------------------\n");
337 if(!options->suppress_checksum) {
338 if (fabs(checksum_fp - checksum_read_fp) > 1e-5) {
339 fprintf(stderr,
"timings[%03d] ERROR in double checksum %14.10f==%14.10f, diff=%14.10f\n", communicators->all_rank,
340 checksum_fp, checksum_read_fp, checksum_fp - checksum_read_fp);
347 if(options->unlink_files) {
348 starttime = MPI_Wtime();
349 barrier_before_unlink(communicators->
work);
351 barrier_after_unlink(communicators->
work);
352 unlinktime = MPI_Wtime() - starttime;
353 MPI_Reduce(&unlinktime, &gunlinktime, 1, MPI_DOUBLE, MPI_MAX, 0, communicators->
work);
354 if (communicators->work_rank == 0) {
355 fprintf(stderr,
"partest result: ultime=%10.6fs unlink %s\n", gunlinktime, fname);
sion_int64 _sion_file_write(const void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Write data to file.
_sion_fileptr * _sion_file_open(const char *fname, unsigned int flags, unsigned int addflags)
Create and open a new file for writing.
#define SION_FILE_FLAG_WRITE
sion_int64 _sion_file_read(void *data, sion_int64 bytes, _sion_fileptr *sion_fileptr)
Read data from file.
#define SION_FILE_FLAG_READ
#define SION_FILE_FLAG_POSIX
#define SION_FILE_FLAG_ANSI
#define SION_FILE_FLAG_CREATE
int _sion_file_close(_sion_fileptr *sion_fileptr)
Close file and destroys fileptr structure.