SIONlib  1.7.6
Scalable I/O library for parallel access to task-local files
sioninter.pyx
1 # -*- python -*-
2 """Interface for SIONlib
3 
4 This interface wrapps access to SIONlib files for access in Python.
5 """
6 
7 from __future__ import (print_function, division)
8 
9 from libc.stdlib cimport malloc, free
10 from libc.stdio cimport FILE
11 
12 from cpython cimport array
13 from array import array
14 import io
15 
16 cimport csioninter
17 from csioninter cimport sion_int32, sion_int64, MPI_Comm, uint64_t
18 
19 from mpi4py import MPI
20 from mpi4py.mpi_c cimport MPI_COMM_WORLD
21 
22 #### init
23 DEFAULT_PAR_MODE = "serial"
24 
25 #### low level
26 
27 #### open
28 DEFAULT_NTASKS = 1
29 DEFAULT_NFILES = 1
30 DEFAULT_GCOMM = None
31 DEFAULT_LCOMM = None
32 DEFAULT_CHUNKSIZE = 32
33 DEFAULT_CHUNKSIZES = None
34 DEFAULT_FSBLKSIZE = -1
35 DEFAULT_GLOBALRANK = -1
36 DEFAULT_GLOBALRANKS = None
37 DEFAULT_FILEPTR = None
38 
39 #### read / write
40 DEFAULT_NITEMS = 0
41 DEFAULT_SIZE = 1
42 
43 
44 class SIONFile(io.BufferedIOBase):
45  """File-like object for handling SIONlib files"""
46  valid_par_modes = ("serial", "mpi")
47 
48  def __init__(self, par_mode=DEFAULT_PAR_MODE):
49  self._fname = ""
50  self._file_mode = ""
51  self._sid = -1
52  if par_mode not in self.valid_par_modes:
53  raise AttributeError("invalid attribute self._par_mode = {}".
54  format(self._par_mode))
55  self._par_mode = par_mode
56 
57  def open(self, fname, file_mode):
58  """Open a file for reading or writing."""
59  self._fname = fname
60  self._file_mode = file_mode
61  if self._par_mode == "serial":
62  self._sid = sion_open(fname, file_mode)
63  elif self._par_mode == "mpi":
64  self._sid = sion_paropen_mpi(fname, file_mode)
65 
66  @property
67  def closed(self):
68  """Check whether sion file is closed."""
69  return self._sid == -1
70 
71  def close(self):
72  """Close sion file."""
73  if self._par_mode == "serial":
74  sion_close(self._sid)
75  elif self._par_mode == "mpi":
76  sion_parclose_mpi(self._sid)
77  self._fname = ""
78  self._file_mode = ""
79  self._sid = -1
80 
81  def write(self, data):
82  """Write data."""
83  sion_fwrite(data, self._sid)
84 
85  def read(self, nitems=DEFAULT_NITEMS):
86  """Read data."""
87  ret = sion_fread(self._sid, nitems)
88  return ret
89 
90  def write_key(self, data, key):
91  """Write key value pair."""
92  sion_fwrite_key(data, key, self._sid)
93 
94  def read_key(self, key, nitems=DEFAULT_NITEMS):
95  """Read value for corresponding key."""
96  ret = sion_fread_key(key, self._sid, nitems)
97  return ret
98 
99 
100 def open(fname, file_mode, par_mode=DEFAULT_PAR_MODE):
101  """Open a file and return a SIONFile instance."""
102  sionfile = SIONFile(par_mode=par_mode)
103  sionfile.open(fname, file_mode)
104  return sionfile
105 
106 
107 cdef sion_open(fname, file_mode, ntasks=DEFAULT_NTASKS, nfiles=DEFAULT_NFILES,
108  chunksizes=DEFAULT_CHUNKSIZES, fsblksize=DEFAULT_FSBLKSIZE,
109  globalranks=DEFAULT_GLOBALRANKS, fileptr=DEFAULT_FILEPTR):
110  """Open a file and return a sion id."""
111  cdef int _ntasks = ntasks
112  cdef int _nfiles = nfiles
113  cdef sion_int32 _fsblksize = fsblksize
114  cdef int * _globalranks = NULL
115  cdef sion_int64 * _chunksizes = NULL
116  cdef FILE * _fileptr = NULL
117 
118  fname_b = fname.encode()
119  file_mode_b = file_mode.encode()
120 
121  _chunksizes = <sion_int64 * >malloc(_ntasks * sizeof(sion_int64))
122  _globalranks = <int * > malloc(_ntasks * sizeof(int))
123 
124  for rank in range(1):
125  _chunksizes[rank] = DEFAULT_CHUNKSIZE
126  _globalranks[rank] = 0
127 
128  ret = csioninter.sion_open(fname_b, file_mode_b, & _ntasks, & _nfiles,
129  & _chunksizes, & _fsblksize, & _globalranks,
130  & _fileptr)
131 
132  free(_chunksizes)
133  free(_globalranks)
134 
135  return ret
136 
137 
138 cdef sion_close(sid):
139  """Close file."""
140  ret = csioninter.sion_close(sid)
141  return ret
142 
143 
144 cdef sion_paropen_mpi(fname, file_mode, nfiles=DEFAULT_NFILES,
145  gComm=DEFAULT_GCOMM, lComm=DEFAULT_LCOMM,
146  chunksize=DEFAULT_CHUNKSIZE, fsblksize=DEFAULT_FSBLKSIZE,
147  globalrank=DEFAULT_GLOBALRANK, fileptr=DEFAULT_FILEPTR):
148  """Open a file and return a sion id."""
149  cdef int _nfiles = nfiles
150  cdef sion_int32 _fsblksize = fsblksize
151  cdef int _globalrank = globalrank
152  cdef sion_int64 _chunksize = chunksize
153  cdef FILE * _fileptr = NULL
154  cdef MPI_Comm _gComm = NULL
155  cdef MPI_Comm _lComm = NULL
156  cdef char * _newname = NULL
157 
158  fname_b = fname.encode()
159  file_mode_b = file_mode.encode()
160 
161  comm = MPI.COMM_WORLD
162  rank = comm.Get_rank()
163  _gComm = MPI_COMM_WORLD
164  _lComm = _gComm
165  _globalrank = rank
166 
167  ret = csioninter.sion_paropen_mpi(fname_b, file_mode_b, & _nfiles, _gComm,
168  & _lComm, & _chunksize, & _fsblksize,
169  & _globalrank, & _fileptr, & _newname)
170 
171  return ret
172 
173 
174 cdef sion_parclose_mpi(sid):
175  """Open a file and return a sion id."""
176  ret = csioninter.sion_parclose_mpi(sid)
177  return ret
178 
179 
180 cdef sion_fwrite(data, sid):
181  """Write data to file."""
182  cdef char * _data = NULL
183 
184  _data = data
185 
186  bytes_written = csioninter.sion_fwrite(_data, DEFAULT_SIZE, len(data), sid)
187 
188  return bytes_written
189 
190 
191 cdef sion_fread(sid, nitems=DEFAULT_NITEMS):
192  """Read data from file."""
193  cdef size_t _size = DEFAULT_SIZE
194  cdef size_t _nitems = nitems
195 
196  if _nitems == DEFAULT_NITEMS:
197  _nitems = sion_bytes_avail_in_chunk(sid)
198 
199  arr = bytearray(_nitems)
200 
201  bytes_read = csioninter.sion_fread(<char * >arr, _size, _nitems, sid)
202 
203  if bytes_read != len(arr) or bytes_read != _size * _nitems:
204  raise IOError("read not successful")
205 
206  return arr
207 
208 
209 cdef sion_bytes_avail_in_chunk(sid):
210  """Get number of remaining bytes in current chunk."""
211  ret = csioninter.sion_bytes_avail_in_chunk(sid)
212 
213  return ret
214 
215 
216 cpdef sion_fwrite_key(data, key, sid):
217  """Write a key value pair."""
218  cdef char * _data = NULL
219 
220  _data = data
221 
222  ret = csioninter.sion_fwrite_key(_data, key, DEFAULT_SIZE, len(data), sid)
223 
224  return ret
225 
226 
227 cpdef sion_fread_key(key, sid, nitems=DEFAULT_NITEMS):
228  """Read value for corresponding key."""
229  cdef size_t _size = DEFAULT_SIZE
230  cdef size_t _nitems = nitems
231 
232  if _nitems == DEFAULT_NITEMS:
233  _nitems = sion_bytes_avail_in_chunk(sid)
234 
235  arr = bytearray(_nitems)
236 
237  bytes_read = csioninter.sion_fread_key(<char * >arr, key, _size, _nitems,
238  sid)
239 
240  if bytes_read != len(arr) or bytes_read != _size * _nitems:
241  raise IOError("read not successful: {} {} {}".format(bytes_read,
242  len(arr),
243  _size * _nitems))
244 
245  return arr