Skip to content

video.VideoDataset

kedro_datasets_experimental.video.VideoDataset

VideoDataset(
    *,
    filepath,
    fourcc="mp4v",
    credentials=None,
    fs_args=None,
    metadata=None
)

Bases: AbstractDataset[AbstractVideo, AbstractVideo]

VideoDataset loads/saves video data from a given filepath as a sequence of PIL.Image.Image using OpenCV.

Example usage for the YAML API
cars:
    type: video.VideoDataset
    filepath: data/01_raw/cars.mp4

motorbikes:
    type: video.VideoDataset
    filepath: s3://your_bucket/data/02_intermediate/company/motorbikes.mp4
    credentials: dev_s3
Example usage for the Python API:
from kedro_datasets.video import VideoDataset
import numpy as np

video = VideoDataset(
    filepath="https://storage.googleapis.com/gtv-videos-bucket/sample/ForBiggerBlazes.mp4"
).load()
frame = video[0]
Example creating a video from numpy frames using Python API
from kedro_datasets.video.video_dataset import VideoDataset, SequenceVideo
import numpy as np
from PIL import Image

frame = np.ones((640, 480, 3), dtype=np.uint8) * 255
imgs = []
for i in range(255):
    imgs.append(Image.fromarray(frame))
    frame -= 1

video = VideoDataset(filepath=tmp_path / "my_video.mp4")
video.save(SequenceVideo(imgs, fps=25))
Example creating a video from numpy frames using a generator and the Python API
from kedro_datasets.video.video_dataset import VideoDataset, GeneratorVideo
import numpy as np
from PIL import Image

def gen():
    frame = np.ones((640, 480, 3), dtype=np.uint8) * 255
    for i in range(255):
        yield Image.fromarray(frame)
        frame -= 1

video = VideoDataset(filepath=tmp_path / "my_video.mp4")
video.save(GeneratorVideo(gen(), fps=25, length=None))

Parameters:

  • filepath (str) –

    The location of the video file to load / save data.

  • fourcc (str | None, default: 'mp4v' ) –

    The codec to use when writing video, note that depending on how opencv is installed there might be more or less codecs avaiable. If set to None, the fourcc from the video object will be used.

  • credentials (dict[str, Any] | None, default: None ) –

    Credentials required to get access to the underlying filesystem. E.g. for GCSFileSystem it should look like {"token": None}.

  • fs_args (dict[str, Any] | None, default: None ) –

    Extra arguments to pass into underlying filesystem class constructor (e.g. {"project": "my-project"} for GCSFileSystem).

  • metadata (dict[str, Any] | None, default: None ) –

    Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins.

Source code in kedro_datasets_experimental/video/video_dataset.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
def __init__(  # noqa: PLR0913
    self,
    *,
    filepath: str,
    fourcc: str | None = "mp4v",
    credentials: dict[str, Any] | None = None,
    fs_args: dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
) -> None:
    """Creates a new instance of VideoDataset to load / save video data for given filepath.

    Args:
        filepath: The location of the video file to load / save data.
        fourcc: The codec to use when writing video, note that depending on how opencv is
            installed there might be more or less codecs avaiable. If set to None, the
            fourcc from the video object will be used.
        credentials: Credentials required to get access to the underlying filesystem.
            E.g. for ``GCSFileSystem`` it should look like `{"token": None}`.
        fs_args: Extra arguments to pass into underlying filesystem class constructor
            (e.g. `{"project": "my-project"}` for ``GCSFileSystem``).
        metadata: Any arbitrary metadata.
            This is ignored by Kedro, but may be consumed by users or external plugins.
    """
    # parse the path and protocol (e.g. file, http, s3, etc.)
    protocol, path = get_protocol_and_path(filepath)
    self._protocol = protocol
    self._filepath = PurePosixPath(path)
    self._fourcc = fourcc
    _fs_args = deepcopy(fs_args) or {}
    _credentials = deepcopy(credentials) or {}
    self._storage_options = {**_credentials, **_fs_args}
    self._fs = fsspec.filesystem(self._protocol, **self._storage_options)
    self.metadata = metadata

_filepath instance-attribute

_filepath = PurePosixPath(path)

_fourcc instance-attribute

_fourcc = fourcc

_fs instance-attribute

_fs = filesystem(_protocol, **(_storage_options))

_protocol instance-attribute

_protocol = protocol

_storage_options instance-attribute

_storage_options = {None: _credentials, None: _fs_args}

metadata instance-attribute

metadata = metadata

_describe

_describe()
Source code in kedro_datasets_experimental/video/video_dataset.py
360
361
def _describe(self) -> dict[str, Any]:
    return {"filepath": self._filepath, "protocol": self._protocol}

_exists

_exists()
Source code in kedro_datasets_experimental/video/video_dataset.py
363
364
def _exists(self) -> bool:
    return self._fs.exists(self._filepath)

_write_to_filepath

_write_to_filepath(video, filepath)
Source code in kedro_datasets_experimental/video/video_dataset.py
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
def _write_to_filepath(self, video: AbstractVideo, filepath: str) -> None:
    # TODO: This uses the codec specified in the VideoDataset if it is not None, this is due
    # to compatibility issues since e.g. h264 coded is licensed and is thus not included in
    # opencv if installed from a binary distribution. Since a h264 video can be read, but not
    # written, it would be error prone to use the videos fourcc code. Further, an issue is
    # that the video object does not know what container format will be used since that is
    # selected by the suffix in the file name of the VideoDataset. Some combinations of codec
    # and container format might not work or will have bad support.
    fourcc = self._fourcc or video.fourcc

    writer = cv2.VideoWriter(
        filepath, cv2.VideoWriter_fourcc(*fourcc), video.fps, video.size
    )
    if not writer.isOpened():
        raise ValueError(
            "Failed to open video writer with params: "
            + f"fourcc={fourcc} fps={video.fps} size={video.size[0]}x{video.size[1]} "
            + f"path={filepath}"
        )
    try:
        for frame in iter(video):
            writer.write(  # PIL images are RGB, opencv expects BGR
                np.asarray(frame)[:, :, ::-1]
            )
    finally:
        writer.release()

load

load()

Loads data from the video file.

Returns:

  • AbstractVideo

    Data from the video file as a AbstractVideo object

Source code in kedro_datasets_experimental/video/video_dataset.py
299
300
301
302
303
304
305
306
307
308
309
310
def load(self) -> AbstractVideo:
    """Loads data from the video file.

    Returns:
        Data from the video file as a AbstractVideo object
    """
    with fsspec.open(
        f"filecache::{self._protocol}://{self._filepath}",
        mode="rb",
        **{self._protocol: self._storage_options},
    ) as fs_file:
        return FileVideo(fs_file.name)

save

save(data)

Saves video data to the specified filepath.

Source code in kedro_datasets_experimental/video/video_dataset.py
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
def save(self, data: AbstractVideo) -> None:
    """Saves video data to the specified filepath."""
    if self._protocol == "file":
        # Write directly to the local file destination
        self._write_to_filepath(data, str(self._filepath))
    else:
        # VideoWriter can't write to an open file object, instead write to a
        # local tmpfile and then copy that to the destination with fsspec.
        # Note that the VideoWriter fails to write to the file on Windows if
        # the file is already open, thus we can't use NamedTemporaryFile.
        with tempfile.TemporaryDirectory() as tmp_dir:
            tmp_file = Path(tmp_dir) / self._filepath.name
            self._write_to_filepath(data, str(tmp_file))
            with fsspec.open(
                f"{self._protocol}://{self._filepath}",
                "wb",
                **self._storage_options,
            ) as f_target:
                with tmp_file.open("r+b") as f_tmp:
                    f_target.write(f_tmp.read())