NetCDFDataset(
*,
filepath,
temppath=None,
load_args=None,
save_args=None,
fs_args=None,
credentials=None,
metadata=None
)
Bases: AbstractDataset
NetCDFDataset loads and saves data to a local netcdf (.nc) file.
Example usage for the YAML API:
single-file:
type: netcdf.NetCDFDataset
filepath: s3://bucket_name/path/to/folder/data.nc
save_args:
mode: a
load_args:
decode_times: False
multi-file:
type: netcdf.NetCDFDataset
filepath: s3://bucket_name/path/to/folder/data*.nc
load_args:
concat_dim: time
combine: nested
parallel: True
Example usage for the Python API:
from kedro_datasets.netcdf import NetCDFDataset
import xarray as xr
ds = xr.DataArray(
[0, 1, 2], dims=["x"], coords={"x": [0, 1, 2]}, name="data"
).to_dataset()
dataset = NetCDFDataset(
filepath=tmp_path / "data.nc",
save_args={"mode": "w"},
)
dataset.save(ds)
reloaded = dataset.load()
assert ds.equals(reloaded)
Parameters:
-
filepath
(str)
–
Filepath in POSIX format to a NetCDF file prefixed with a
protocol like s3://. If prefix is not provided, file protocol
(local filesystem) will be used. The prefix should be any protocol
supported by fsspec. It can also be a path to a glob. If a
glob is provided then it can be used for reading multiple NetCDF
files.
-
temppath
(str | None, default:
None
)
–
Local temporary directory, used when reading from remote storage,
since NetCDF files cannot be directly read from remote storage.
-
load_args
(dict[str, Any] | None, default:
None
)
–
Additional options for loading NetCDF file(s).
Here you can find all available arguments when reading single file:
https://xarray.pydata.org/en/stable/generated/xarray.open_dataset.html
Here you can find all available arguments when reading multiple files:
https://xarray.pydata.org/en/stable/generated/xarray.open_mfdataset.html
All defaults are preserved.
-
save_args
(dict[str, Any] | None, default:
None
)
–
Additional saving options for saving NetCDF file(s).
Here you can find all available arguments:
https://xarray.pydata.org/en/stable/generated/xarray.Dataset.to_netcdf.html
All defaults are preserved.
-
fs_args
(dict[str, Any] | None, default:
None
)
–
Extra arguments to pass into underlying filesystem class
constructor (e.g. {"cache_regions": "us-east-1"} for
s3fs.S3FileSystem).
-
credentials
(dict[str, Any] | None, default:
None
)
–
Credentials required to get access to the underlying filesystem.
E.g. for GCSFileSystem it should look like {"token": None}.
-
metadata
(dict[str, Any] | None, default:
None
)
–
Any arbitrary metadata.
This is ignored by Kedro, but may be consumed by users or external plugins.
Source code in kedro_datasets_experimental/netcdf/netcdf_dataset.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131 | def __init__( # noqa
self,
*,
filepath: str,
temppath: str | None = None,
load_args: dict[str, Any] | None = None,
save_args: dict[str, Any] | None = None,
fs_args: dict[str, Any] | None = None,
credentials: dict[str, Any] | None = None,
metadata: dict[str, Any] | None = None,
):
"""Creates a new instance of ``NetCDFDataset`` pointing to a concrete NetCDF
file on a specific filesystem
Args:
filepath: Filepath in POSIX format to a NetCDF file prefixed with a
protocol like `s3://`. If prefix is not provided, `file` protocol
(local filesystem) will be used. The prefix should be any protocol
supported by ``fsspec``. It can also be a path to a glob. If a
glob is provided then it can be used for reading multiple NetCDF
files.
temppath: Local temporary directory, used when reading from remote storage,
since NetCDF files cannot be directly read from remote storage.
load_args: Additional options for loading NetCDF file(s).
Here you can find all available arguments when reading single file:
https://xarray.pydata.org/en/stable/generated/xarray.open_dataset.html
Here you can find all available arguments when reading multiple files:
https://xarray.pydata.org/en/stable/generated/xarray.open_mfdataset.html
All defaults are preserved.
save_args: Additional saving options for saving NetCDF file(s).
Here you can find all available arguments:
https://xarray.pydata.org/en/stable/generated/xarray.Dataset.to_netcdf.html
All defaults are preserved.
fs_args: Extra arguments to pass into underlying filesystem class
constructor (e.g. `{"cache_regions": "us-east-1"}` for
``s3fs.S3FileSystem``).
credentials: Credentials required to get access to the underlying filesystem.
E.g. for ``GCSFileSystem`` it should look like `{"token": None}`.
metadata: Any arbitrary metadata.
This is ignored by Kedro, but may be consumed by users or external plugins.
"""
self._fs_args = deepcopy(fs_args) or {}
self._credentials = deepcopy(credentials) or {}
self._temppath = Path(temppath) if temppath is not None else None
protocol, path = get_protocol_and_path(filepath)
if protocol == "file":
self._fs_args.setdefault("auto_mkdir", True)
elif protocol != "file" and self._temppath is None:
raise ValueError(
"Need to set temppath in catalog if NetCDF file exists on remote "
+ "filesystem"
)
self._protocol = protocol
self._filepath = filepath
self._storage_options = {**self._credentials, **self._fs_args}
self._fs = fsspec.filesystem(self._protocol, **self._storage_options)
self.metadata = metadata
# Handle default load and save arguments
self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})}
self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})}
# Determine if multiple NetCDF files are being loaded in.
self._is_multifile = (
True if "*" in str(PurePosixPath(self._filepath).stem) else False
)
|
DEFAULT_LOAD_ARGS
class-attribute
instance-attribute
DEFAULT_SAVE_ARGS
class-attribute
instance-attribute
_credentials
instance-attribute
_credentials = deepcopy(credentials) or {}
_filepath
instance-attribute
_fs
instance-attribute
_fs = filesystem(_protocol, **(_storage_options))
_fs_args
instance-attribute
_fs_args = deepcopy(fs_args) or {}
_is_multifile
instance-attribute
_is_multifile = True if '*' in str(stem) else False
_load_args
instance-attribute
_load_args = {
None: DEFAULT_LOAD_ARGS,
None: load_args or {},
}
_protocol
instance-attribute
_save_args
instance-attribute
_save_args = {
None: DEFAULT_SAVE_ARGS,
None: save_args or {},
}
_storage_options
instance-attribute
_storage_options = {None: _credentials, None: _fs_args}
_temppath
instance-attribute
_temppath = Path(temppath) if temppath is not None else None
__del__
Cleanup temporary directory
Source code in kedro_datasets_experimental/netcdf/netcdf_dataset.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216 | def __del__(self):
"""Cleanup temporary directory"""
if self._temppath is not None:
logger.info("Deleting local temporary files.")
temp_filepath = self._temppath / PurePosixPath(self._filepath).stem
if self._is_multifile:
temp_files = glob(str(temp_filepath))
for file in temp_files:
try:
Path(file).unlink()
except FileNotFoundError: # pragma: no cover
pass # pragma: no cover
else:
temp_filepath = (
str(temp_filepath) + "/" + PurePosixPath(self._filepath).name
)
try:
Path(temp_filepath).unlink()
except FileNotFoundError:
pass
|
_describe
Source code in kedro_datasets_experimental/netcdf/netcdf_dataset.py
174
175
176
177
178
179
180 | def _describe(self) -> dict[str, Any]:
return dict(
filepath=self._filepath,
protocol=self._protocol,
load_args=self._load_args,
save_args=self._save_args,
)
|
_exists
Source code in kedro_datasets_experimental/netcdf/netcdf_dataset.py
182
183
184
185
186
187
188
189
190
191 | def _exists(self) -> bool:
load_path = self._filepath
if self._is_multifile:
files = self._fs.glob(load_path)
exists = True if files else False
else:
exists = self._fs.exists(load_path)
return exists
|
_invalidate_cache
Invalidate underlying filesystem caches.
Source code in kedro_datasets_experimental/netcdf/netcdf_dataset.py
| def _invalidate_cache(self):
"""Invalidate underlying filesystem caches."""
self._fs.invalidate_cache(self._filepath)
|
load
Source code in kedro_datasets_experimental/netcdf/netcdf_dataset.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153 | def load(self) -> xr.Dataset:
load_path = self._filepath
multi_load_path = load_path
# If NetCDF(s) are on any type of remote storage, need to sync to local to open.
# Kerchunk could be implemented here in the future for direct remote reading.
if self._protocol != "file":
logger.info("Syncing remote NetCDF file to local storage.")
if self._is_multifile:
multi_load_path = sorted(self._fs.glob(load_path)) # type: ignore[assignment]
self._fs.get(load_path, f"{self._temppath}/")
load_path = f"{self._temppath}/{str(Path(self._filepath).stem)}.nc"
if self._is_multifile and multi_load_path:
data = xr.open_mfdataset(multi_load_path, **self._load_args)
else:
data = xr.open_dataset(load_path, **self._load_args)
return data
|
save
Source code in kedro_datasets_experimental/netcdf/netcdf_dataset.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172 | def save(self, data: xr.Dataset):
if self._is_multifile:
raise DatasetError(
"Globbed multifile datasets with '*' in filepath cannot be saved. "
+ "Create an alternate NetCDFDataset with a single .nc output file."
)
else:
if self._protocol == "file":
data.to_netcdf(path=self._filepath, **self._save_args)
else:
if self._temppath is None:
raise DatasetError("_temppath should have been set in __init__")
temp_save_path = self._temppath / PurePosixPath(self._filepath).name
data.to_netcdf(path=str(temp_save_path), **self._save_args)
# Sync to remote storage
self._fs.put_file(str(temp_save_path), self._filepath)
self._invalidate_cache()
|