#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Generic utility class to write and read cartesian grid field solutions into binary files.
It implements the base file handler class :class:`FieldsIO`, that is specialized into :
- :class:`Scal0D` : for 0D fields (scalar) with a given number of variables
- :class:`Cart1D` : for 1D fields with a given number of variables
- :class:`Cart2D` : for 2D fields on a cartesian grid with a given number of variables
While each file handler need to be setup with specific parameters (grid, ...),
each written file can be read using the same interface implemented in the
base abstract class.
Example
-------
>>> import numpy as np
>>> from pySDC.helpers.fieldsIO import Cart1D, FieldsIO
>>>
>>> # Write some fields in files
>>> x = np.linspace(0, 1, 101)
>>> fOut = Cart2D(np.float64, "file.pysdc")
>>> fOut.setHeader(nVar=2, coordX=x)
>>> fOut.initialize()
>>> times = [0, 1, 2]
>>> u0 = np.array([-1, 1])[:, None]*x[None, :]
>>> for t in times:
>>> fOut.addField(t, t*u0)
>>>
>>> # Read the file using a the generic interface
>>> fIn = FieldsIO.fromFile("file.pysdc")
>>> times = fIn.times
>>> assert len(times) == fIn.nFields
>>> tEnd, uEnd = fIn.readField(-1)
>>> assert tEnd == times[-1]
Notes
-----
🚀 :class:`Cart1D` and :class:`Cart2D` are compatible with a MPI-based cartesian decomposition.
See :class:`pySDC.tests.test_helpers.test_fieldsIO.writeFields_MPI` for an illustrative example.
Warning
-------
To use MPI collective writing, you need to call first the class methods :class:`Cart1D.initMPI`
or :class:`Cart2D.initMPI` from the associated class (cf their docstring).
Also, their associated `setHeader` methods **must be given the global grids coordinates**,
wether code is run in parallel or not.
"""
import os
import numpy as np
from typing import Type, TypeVar
import logging
T = TypeVar("T")
try:
try:
import dolfin as df # noqa: F841 (for some reason, dolfin always needs to be imported before mpi4py)
except ImportError:
pass
from mpi4py import MPI
except ImportError:
class MPI:
COMM_WORLD = None
Intracomm = T
# Supported data types
DTYPES = {
0: np.float64, # double precision
1: np.complex128,
}
try:
DTYPES.update(
{
2: np.float128, # quadruple precision
3: np.complex256,
}
)
except AttributeError:
logging.getLogger('FieldsIO').debug('Warning: Quadruple precision not available on this machine')
try:
DTYPES.update(
{
4: np.float32, # single precision
5: np.complex64,
}
)
except AttributeError:
logging.getLogger('FieldsIO').debug('Warning: Single precision not available on this machine')
DTYPES_AVAIL = {val: key for key, val in DTYPES.items()}
# Header dtype
H_DTYPE = np.int8
T_DTYPE = np.float64
[docs]
class FieldsIO:
"""Abstract IO file handler"""
STRUCTS = {} # supported structures, modified dynamically
sID = None # structure ID of the FieldsIO class, modified dynamically
tSize = T_DTYPE().itemsize
ALLOW_OVERWRITE = False
def __init__(self, dtype, fileName):
"""
Parameters
----------
dtype : np.dtype
The data type of the fields values.
fileName : str
File.
"""
assert dtype in DTYPES_AVAIL, f"{dtype=} not available. Supported on this machine: {list(DTYPES_AVAIL.keys())}"
self.dtype = dtype
self.fileName = fileName
self.initialized = False
# Initialized by the setHeader abstract method
self.header = None
self.nItems = None # number of values (dof) stored into one field
def __str__(self):
return f"FieldsIO[{self.__class__.__name__}|{self.dtype.__name__}|file:{self.fileName}]<{hex(id(self))}>"
def __repr__(self):
return self.__str__()
[docs]
@classmethod
def fromFile(cls, fileName):
"""
Read a file storing fields, and return the `FieldsIO` of the appropriate
field type (structure).
Parameters
----------
fileName : str
Name of the binary file.
Returns
-------
fieldsIO : :class:`FieldsIO`
The specialized `FieldsIO` adapted to the file.
"""
assert os.path.isfile(fileName), f"not a file ({fileName})"
with open(fileName, "rb") as f:
STRUCT, DTYPE = np.fromfile(f, dtype=H_DTYPE, count=2)
fieldsIO: FieldsIO = cls.STRUCTS[STRUCT](DTYPES[DTYPE], fileName)
fieldsIO.readHeader(f)
fieldsIO.initialized = True
return fieldsIO
@property
def hBase(self) -> np.ndarray:
"""Base header into numpy array format"""
return np.array([self.sID, DTYPES_AVAIL[self.dtype]], dtype=H_DTYPE)
[docs]
@classmethod
def register(cls, sID):
"""
Decorator used to register a new class FieldsIO specialized class
Parameters
----------
sID : int
Unique identifyer for the file, used in the binary file.
Since it's encoded on a 8-bytes signed integer,
it must be between -128 and 127
Example
-------
>>> # New specialized FieldsIO class
>>> @FieldsIO.register(sID=31)
>>> class HexaMesh2D(FieldsIO):
>>> pass # ... implementation
"""
def wrapper(registered: Type[T]) -> Type[T]:
assert (
sID not in cls.STRUCTS
), f"struct ID already taken by {cls.STRUCTS[sID]}, cannot use it for {registered}"
cls.STRUCTS[sID] = registered
registered.sID = sID
return registered
return wrapper
[docs]
def initialize(self):
"""Initialize the file handler : create the file with header, removing any existing file with the same name"""
assert self.header is not None, "header must be set before initializing FieldsIO"
assert not self.initialized, "FieldsIO already initialized"
if not self.ALLOW_OVERWRITE:
assert not os.path.isfile(
self.fileName
), "file already exists, use FieldsIO.ALLOW_OVERWRITE = True to allow overwriting"
with open(self.fileName, "w+b") as f:
self.hBase.tofile(f)
for array in self.hInfos:
array.tofile(f)
self.initialized = True
@property
def hInfos(self) -> list[np.ndarray]:
"""(Abstract) Array representing the grid structure to be written in the binary file."""
raise NotImplementedError()
@property
def hSize(self):
"""Size of the full header (in bytes)"""
return self.hBase.nbytes + sum(hInfo.nbytes for hInfo in self.hInfos)
@property
def itemSize(self):
"""Size of one field value (in bytes)"""
return self.dtype().itemsize
@property
def fSize(self):
"""Full size of a field (in bytes)"""
return self.nItems * self.itemSize
@property
def fileSize(self):
"""Current size of the file (in bytes)"""
return os.path.getsize(self.fileName)
[docs]
def addField(self, time, field):
"""
Append one field solution at the end of the file with one given time.
Parameters
----------
time : float-like
The associated time of the field solution.
field : np.ndarray
The field values.
"""
assert self.initialized, "cannot add field to a non initialized FieldsIO"
field = np.asarray(field)
assert field.dtype == self.dtype, f"expected {self.dtype} dtype, got {field.dtype}"
assert field.size == self.nItems, f"expected {self.nItems} values, got {field.size}"
with open(self.fileName, "ab") as f:
np.array(time, dtype=T_DTYPE).tofile(f)
field.tofile(f)
@property
def nFields(self):
"""Number of fields currently stored in the binary file"""
return int((self.fileSize - self.hSize) // (self.tSize + self.fSize))
@property
def times(self):
"""Vector of all times stored in the binary file"""
times = []
with open(self.fileName, "rb") as f:
f.seek(self.hSize)
for i in range(self.nFields):
t = np.fromfile(f, dtype=T_DTYPE, count=1, offset=0 if i == 0 else self.fSize)[0]
times.append(float(t))
return times
[docs]
def time(self, idx):
"""Time stored at a given field index"""
idx = self.formatIndex(idx)
offset = self.hSize + idx * (self.tSize + self.fSize)
with open(self.fileName, "rb") as f:
t = np.fromfile(f, dtype=T_DTYPE, count=1, offset=offset)[0]
return float(t)
[docs]
def readField(self, idx):
"""
Read one field stored in the binary file, corresponding to the given
time index.
Parameters
----------
idx : int
Positional index of the field.
Returns
-------
t : float
Stored time for this field.
field : np.ndarray
Read fields in a numpy array.
"""
idx = self.formatIndex(idx)
offset = self.hSize + idx * (self.tSize + self.fSize)
with open(self.fileName, "rb") as f:
f.seek(offset)
t = float(np.fromfile(f, dtype=T_DTYPE, count=1)[0])
field = np.fromfile(f, dtype=self.dtype, count=self.nItems)
self.reshape(field)
return t, field
[docs]
def reshape(self, field):
"""Eventually reshape the field to correspond to the grid structure"""
pass
[docs]
@FieldsIO.register(sID=0)
class Scal0D(FieldsIO):
"""FieldsIO handler storing a given number of scalar"""
# -------------------------------------------------------------------------
# Overriden methods
# -------------------------------------------------------------------------
@property
def hInfos(self):
"""Array representing the grid structure to be written in the binary file."""
return [np.array([self.nVar], dtype=np.int64)]
# -------------------------------------------------------------------------
# Class specifics
# -------------------------------------------------------------------------
@property
def nVar(self):
"""Number of variables in a fields, as described in the header"""
return self.header["nVar"]
[docs]
@FieldsIO.register(sID=1)
class Cart1D(Scal0D):
"""FieldsIO handler storing a given number of 2D scalar variables"""
# -------------------------------------------------------------------------
# Overriden methods
# -------------------------------------------------------------------------
@property
def hInfos(self):
"""Array representing the grid structure to be written in the binary file."""
return [np.array([self.nVar, self.nX], dtype=np.int64), np.array(self.header["coordX"], dtype=np.float64)]
[docs]
def reshape(self, fields: np.ndarray):
fields.shape = (self.nVar, self.nX)
# -------------------------------------------------------------------------
# Class specifics
# -------------------------------------------------------------------------
@property
def nX(self):
"""Number of points in x direction"""
return self.header["coordX"].size
[docs]
@staticmethod
def setupCoords(**coords):
"""Utility function to setup grids in multuple dimensions, given the keyword arguments"""
coords = {name: np.asarray(coord, dtype=np.float64) for name, coord in coords.items()}
for name, coord in coords.items():
assert coord.ndim == 1, f"{name} must be one dimensional"
return coords
# -------------------------------------------------------------------------
# MPI-parallel implementation
# -------------------------------------------------------------------------
comm: MPI.Intracomm = None
[docs]
@classmethod
def setupMPI(cls, comm: MPI.Intracomm, iLocX, nLocX):
"""
Setup the MPI mode for the files IO, considering a decomposition
of the 1D grid into contiuous subintervals.
Parameters
----------
comm : MPI.Intracomm
The space decomposition communicator.
iLocX : int
Starting index of the local sub-domain in the global `coordX`.
nLocX : int
Number of points in the local sub-domain.
"""
cls.comm = comm
cls.iLocX = iLocX
cls.nLocX = nLocX
cls.mpiFile = None
@property
def MPI_ON(self):
"""Wether or not MPI is activated"""
if self.comm is None:
return False
return self.comm.Get_size() > 1
@property
def MPI_ROOT(self):
"""Wether or not the process is MPI Root"""
if self.comm is None:
return True
return self.comm.Get_rank() == 0
[docs]
def MPI_FILE_OPEN(self, mode):
"""Open the binary file in MPI mode"""
amode = {
"r": MPI.MODE_RDONLY,
"a": MPI.MODE_WRONLY | MPI.MODE_APPEND,
}[mode]
self.mpiFile = MPI.File.Open(self.comm, self.fileName, amode)
[docs]
def MPI_WRITE(self, data):
"""Write data (np.ndarray) in the binary file in MPI mode, at the current file cursor position."""
self.mpiFile.Write(data)
[docs]
def MPI_WRITE_AT(self, offset, data: np.ndarray):
"""
Write data in the binary file in MPI mode, with a given offset
**relative to the beginning of the file**.
Parameters
----------
offset : int
Offset to write at, relative to the beginning of the file, in bytes.
data : np.ndarray
Data to be written in the binary file.
"""
self.mpiFile.Write_at(offset, data)
[docs]
def MPI_READ_AT(self, offset, data):
"""
Read data from the binary file in MPI mode, with a given offset
**relative to the beginning of the file**.
Parameters
----------
offset : int
Offset to read at, relative to the beginning of the file, in bytes.
data : np.ndarray
Array on which to read the data from the binary file.
"""
self.mpiFile.Read_at(offset, data)
[docs]
def MPI_FILE_CLOSE(self):
"""Close the binary file in MPI mode"""
self.mpiFile.Close()
self.mpiFile = None
[docs]
def initialize(self):
"""Initialize the binary file (write header) in MPI mode"""
if self.MPI_ROOT:
try:
super().initialize()
except AssertionError as e:
if self.MPI_ON:
print(f"{type(e)}: {e}")
self.comm.Abort()
else:
raise e
if self.MPI_ON:
self.comm.Barrier() # Important, should not be removed !
self.initialized = True
[docs]
def addField(self, time, field):
"""
Append one field solution at the end of the file with one given time,
possibly using MPI.
Parameters
----------
time : float-like
The associated time of the field solution.
field : np.ndarray
The (local) field values.
Note
----
If a MPI decomposition is used, field **must be** the local field values.
"""
if not self.MPI_ON:
return super().addField(time, field)
assert self.initialized, "cannot add field to a non initialized FieldsIO"
field = np.asarray(field)
assert field.dtype == self.dtype, f"expected {self.dtype} dtype, got {field.dtype}"
assert field.shape == (self.nVar, self.nLocX), f"expected {(self.nVar, self.nLocX)} shape, got {field.shape}"
offset0 = self.fileSize
self.MPI_FILE_OPEN(mode="a")
if self.MPI_ROOT:
self.MPI_WRITE(np.array(time, dtype=T_DTYPE))
offset0 += self.tSize
for iVar in range(self.nVar):
offset = offset0 + (iVar * self.nX + self.iLocX) * self.itemSize
self.MPI_WRITE_AT(offset, field[iVar])
self.MPI_FILE_CLOSE()
[docs]
def readField(self, idx):
"""
Read one field stored in the binary file, corresponding to the given
time index, possibly in MPI mode.
Parameters
----------
idx : int
Positional index of the field.
Returns
-------
t : float
Stored time for this field.
field : np.ndarray
Read (local) fields in a numpy array.
Note
----
If a MPI decomposition is used, it reads and returns the local fields values only.
"""
if not self.MPI_ON:
return super().readField(idx)
idx = self.formatIndex(idx)
offset0 = self.hSize + idx * (self.fSize + self.tSize)
with open(self.fileName, "rb") as f:
t = float(np.fromfile(f, dtype=T_DTYPE, count=1, offset=offset0)[0])
offset0 += self.tSize
field = np.empty((self.nVar, self.nLocX), dtype=self.dtype)
self.MPI_FILE_OPEN(mode="r")
for iVar in range(self.nVar):
offset = offset0 + (iVar * self.nX + self.iLocX) * self.itemSize
self.MPI_READ_AT(offset, field[iVar])
self.MPI_FILE_CLOSE()
return t, field
[docs]
@FieldsIO.register(sID=2)
class Cart2D(Cart1D):
"""FieldsIO handler storing a given number of 2D scalar variables"""
# -------------------------------------------------------------------------
# Overriden methods
# -------------------------------------------------------------------------
@property
def hInfos(self):
"""Array representing the grid structure to be written in the binary file."""
return [
np.array([self.nVar, self.nX, self.nY], dtype=np.int64),
np.array(self.header["coordX"], dtype=np.float64),
np.array(self.header["coordY"], dtype=np.float64),
]
[docs]
def reshape(self, fields: np.ndarray):
"""Reshape the fields to a [nVar, nX, nY] array (inplace operation)"""
fields.shape = (self.nVar, self.nX, self.nY)
# -------------------------------------------------------------------------
# Class specifics
# -------------------------------------------------------------------------
@property
def nY(self):
"""Number of points in y direction"""
return self.header["coordY"].size
# -------------------------------------------------------------------------
# MPI-parallel implementation
# -------------------------------------------------------------------------
[docs]
@classmethod
def setupMPI(cls, comm: MPI.Intracomm, iLocX, nLocX, iLocY, nLocY):
super().setupMPI(comm, iLocX, nLocX)
cls.iLocY = iLocY
cls.nLocY = nLocY
[docs]
def addField(self, time, field):
"""
Append one field solution at the end of the file with one given time,
possibly using MPI.
Parameters
----------
time : float-like
The associated time of the field solution.
field : np.ndarray
The (local) field values.
Note
----
If a MPI decomposition is used, field **must be** the local field values.
"""
if not self.MPI_ON:
return super().addField(time, field)
assert self.initialized, "cannot add field to a non initialized FieldsIO"
field = np.asarray(field)
assert field.dtype == self.dtype, f"expected {self.dtype} dtype, got {field.dtype}"
assert field.shape == (
self.nVar,
self.nLocX,
self.nLocY,
), f"expected {(self.nVar, self.nLocX, self.nLocY)} shape, got {field.shape}"
offset0 = self.fileSize
self.MPI_FILE_OPEN(mode="a")
if self.MPI_ROOT:
self.MPI_WRITE(np.array(time, dtype=T_DTYPE))
offset0 += self.tSize
for iVar in range(self.nVar):
for iX in range(self.nLocX):
offset = offset0 + (iVar * self.nX * self.nY + (self.iLocX + iX) * self.nY + self.iLocY) * self.itemSize
self.MPI_WRITE_AT(offset, field[iVar, iX])
self.MPI_FILE_CLOSE()
[docs]
def readField(self, idx):
"""
Read one field stored in the binary file, corresponding to the given
time index, eventually in MPI mode.
Parameters
----------
idx : int
Positional index of the field.
Returns
-------
t : float
Stored time for this field.
field : np.ndarray
Read (local) fields in a numpy array.
Note
----
If a MPI decomposition is used, it reads and returns the local fields values only.
"""
if not self.MPI_ON:
return super().readField(idx)
idx = self.formatIndex(idx)
offset0 = self.hSize + idx * (self.tSize + self.fSize)
with open(self.fileName, "rb") as f:
t = float(np.fromfile(f, dtype=T_DTYPE, count=1, offset=offset0)[0])
offset0 += self.tSize
field = np.empty((self.nVar, self.nLocX, self.nLocY), dtype=self.dtype)
self.MPI_FILE_OPEN(mode="r")
for iVar in range(self.nVar):
for iX in range(self.nLocX):
offset = offset0 + (iVar * self.nX * self.nY + (self.iLocX + iX) * self.nY + self.iLocY) * self.itemSize
self.MPI_READ_AT(offset, field[iVar, iX])
self.MPI_FILE_CLOSE()
return t, field