Skip to content

EmailMessageDataset

EmailMessageDataset loads/saves an email message from/to a file using an underlying filesystem (e.g.: local, S3, GCS). It uses the email package in the standard library to manage email messages.

kedro_datasets.email.EmailMessageDataset

EmailMessageDataset(
    *,
    filepath,
    load_args=None,
    save_args=None,
    version=None,
    credentials=None,
    fs_args=None,
    metadata=None
)

Bases: AbstractVersionedDataset[Message, Message]

EmailMessageDataset loads/saves an email message from/to a file using an underlying filesystem (e.g.: local, S3, GCS). It uses the email package in the standard library to manage email messages.

Note that EmailMessageDataset doesn't handle sending email messages.

Examples:

Using the Python API:

>>> from email.message import EmailMessage
>>>
>>> from kedro_datasets.email import EmailMessageDataset
>>>
>>> string_to_write = "what would you do if you were invisable for one day????"
>>>
>>> # Create a text/plain message
>>> msg = EmailMessage()
>>> msg.set_content(string_to_write)
>>> msg["Subject"] = "invisibility"
>>> msg["From"] = '"sin studly17"'
>>> msg["To"] = '"strong bad"'
>>>
>>> dataset = EmailMessageDataset(filepath=tmp_path / "test")
>>> dataset.save(msg)
>>> reloaded = dataset.load()
>>> assert msg.__dict__ == reloaded.__dict__

Parameters:

  • filepath (str | PathLike) –

    Filepath in POSIX format to a text file prefixed with a protocol like s3://. If prefix is not provided, file protocol (local filesystem) will be used. The prefix should be any protocol supported by fsspec. Note: http(s) doesn't support versioning.

  • load_args (dict[str, Any] | None, default: None ) –

    email options for parsing email messages (arguments passed into email.parser.Parser.parse). Here you can find all available arguments: https://docs.python.org/3/library/email.parser.html#email.parser.Parser.parse If you would like to specify options for the Parser, you can include them under the "parser" key. Here you can find all available arguments: https://docs.python.org/3/library/email.parser.html#email.parser.Parser All defaults are preserved, but "policy", which is set to email.policy.default.

  • save_args (dict[str, Any] | None, default: None ) –

    email options for generating MIME documents (arguments passed into email.generator.Generator.flatten). Here you can find all available arguments: https://docs.python.org/3/library/email.generator.html#email.generator.Generator.flatten If you would like to specify options for the Generator, you can include them under the "generator" key. Here you can find all available arguments: https://docs.python.org/3/library/email.generator.html#email.generator.Generator All defaults are preserved.

  • version (Version | None, default: None ) –

    If specified, should be an instance of kedro.io.core.Version. If its load attribute is None, the latest version will be loaded. If its save attribute is None, save version will be autogenerated.

  • credentials (dict[str, Any] | None, default: None ) –

    Credentials required to get access to the underlying filesystem. E.g. for GCSFileSystem it should look like {"token": None}.

  • fs_args (dict[str, Any] | None, default: None ) –

    Extra arguments to pass into underlying filesystem class constructor (e.g. {"project": "my-project"} for GCSFileSystem), as well as to pass to the filesystem's open method through nested keys open_args_load and open_args_save. Here you can find all available arguments for open: https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.open All defaults are preserved, except mode, which is set to r when loading and to w when saving.

  • metadata (dict[str, Any] | None, default: None ) –

    Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins.

Source code in kedro_datasets/email/message_dataset.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def __init__(  # noqa: PLR0913
    self,
    *,
    filepath: str | os.PathLike,
    load_args: dict[str, Any] | None = None,
    save_args: dict[str, Any] | None = None,
    version: Version | None = None,
    credentials: dict[str, Any] | None = None,
    fs_args: dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
) -> None:
    """Creates a new instance of ``EmailMessageDataset`` pointing to a concrete text file
    on a specific filesystem.

    Args:
        filepath: Filepath in POSIX format to a text file prefixed with a protocol like `s3://`.
            If prefix is not provided, `file` protocol (local filesystem) will be used.
            The prefix should be any protocol supported by ``fsspec``.
            Note: `http(s)` doesn't support versioning.
        load_args: ``email`` options for parsing email messages (arguments passed
            into ``email.parser.Parser.parse``). Here you can find all available arguments:
            https://docs.python.org/3/library/email.parser.html#email.parser.Parser.parse
            If you would like to specify options for the `Parser`,
            you can include them under the "parser" key. Here you can
            find all available arguments:
            https://docs.python.org/3/library/email.parser.html#email.parser.Parser
            All defaults are preserved, but "policy", which is set to ``email.policy.default``.
        save_args: ``email`` options for generating MIME documents (arguments passed into
            ``email.generator.Generator.flatten``). Here you can find all available arguments:
            https://docs.python.org/3/library/email.generator.html#email.generator.Generator.flatten
            If you would like to specify options for the `Generator`,
            you can include them under the "generator" key. Here you can
            find all available arguments:
            https://docs.python.org/3/library/email.generator.html#email.generator.Generator
            All defaults are preserved.
        version: If specified, should be an instance of
            ``kedro.io.core.Version``. If its ``load`` attribute is
            None, the latest version will be loaded. If its ``save``
            attribute is None, save version will be autogenerated.
        credentials: Credentials required to get access to the underlying filesystem.
            E.g. for ``GCSFileSystem`` it should look like `{"token": None}`.
        fs_args: Extra arguments to pass into underlying filesystem class constructor
            (e.g. `{"project": "my-project"}` for ``GCSFileSystem``), as well as
            to pass to the filesystem's `open` method through nested keys
            `open_args_load` and `open_args_save`.
            Here you can find all available arguments for `open`:
            https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.open
            All defaults are preserved, except `mode`, which is set to `r` when loading
            and to `w` when saving.
        metadata: Any arbitrary metadata.
            This is ignored by Kedro, but may be consumed by users or external plugins.
    """
    _fs_args = deepcopy(fs_args) or {}
    _fs_open_args_load = _fs_args.pop("open_args_load", {})
    _fs_open_args_save = _fs_args.pop("open_args_save", {})
    _credentials = deepcopy(credentials) or {}

    protocol, path = get_protocol_and_path(filepath, version)

    self._protocol = protocol
    if protocol == "file":
        _fs_args.setdefault("auto_mkdir", True)
    self._fs = fsspec.filesystem(self._protocol, **_credentials, **_fs_args)

    self.metadata = metadata

    super().__init__(
        filepath=PurePosixPath(path),
        version=version,
        exists_function=self._fs.exists,
        glob_function=self._fs.glob,
    )

    # Handle default load and save and fs arguments
    self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})}
    self._parser_args = self._load_args.pop("parser", {"policy": default})

    self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})}
    self._generator_args = self._save_args.pop("generator", {})

    self._fs_open_args_load = {
        **self.DEFAULT_FS_ARGS.get("open_args_load", {}),
        **(_fs_open_args_load or {}),
    }
    self._fs_open_args_save = {
        **self.DEFAULT_FS_ARGS.get("open_args_save", {}),
        **(_fs_open_args_save or {}),
    }

DEFAULT_FS_ARGS class-attribute instance-attribute

DEFAULT_FS_ARGS = {
    "open_args_save": {"mode": "w"},
    "open_args_load": {"mode": "r"},
}

DEFAULT_LOAD_ARGS class-attribute instance-attribute

DEFAULT_LOAD_ARGS = {}

DEFAULT_SAVE_ARGS class-attribute instance-attribute

DEFAULT_SAVE_ARGS = {}

_fs instance-attribute

_fs = filesystem(_protocol, **_credentials, **_fs_args)

_fs_open_args_load instance-attribute

_fs_open_args_load = {
    None: get("open_args_load", {}),
    None: _fs_open_args_load or {},
}

_fs_open_args_save instance-attribute

_fs_open_args_save = {
    None: get("open_args_save", {}),
    None: _fs_open_args_save or {},
}

_generator_args instance-attribute

_generator_args = pop('generator', {})

_load_args instance-attribute

_load_args = {
    None: DEFAULT_LOAD_ARGS,
    None: load_args or {},
}

_parser_args instance-attribute

_parser_args = pop('parser', {'policy': default})

_protocol instance-attribute

_protocol = protocol

_save_args instance-attribute

_save_args = {
    None: DEFAULT_SAVE_ARGS,
    None: save_args or {},
}

metadata instance-attribute

metadata = metadata

_describe

_describe()
Source code in kedro_datasets/email/message_dataset.py
152
153
154
155
156
157
158
159
160
161
def _describe(self) -> dict[str, Any]:
    return {
        "filepath": self._filepath,
        "protocol": self._protocol,
        "load_args": self._load_args,
        "parser_args": self._parser_args,
        "save_args": self._save_args,
        "generator_args": self._generator_args,
        "version": self._version,
    }

_exists

_exists()
Source code in kedro_datasets/email/message_dataset.py
177
178
179
180
181
182
183
def _exists(self) -> bool:
    try:
        load_path = get_filepath_str(self._get_load_path(), self._protocol)
    except DatasetError:
        return False

    return self._fs.exists(load_path)

_invalidate_cache

_invalidate_cache()

Invalidate underlying filesystem caches.

Source code in kedro_datasets/email/message_dataset.py
189
190
191
192
def _invalidate_cache(self) -> None:
    """Invalidate underlying filesystem caches."""
    filepath = get_filepath_str(self._filepath, self._protocol)
    self._fs.invalidate_cache(filepath)

_release

_release()
Source code in kedro_datasets/email/message_dataset.py
185
186
187
def _release(self) -> None:
    super()._release()
    self._invalidate_cache()

load

load()
Source code in kedro_datasets/email/message_dataset.py
163
164
165
166
167
def load(self) -> Message:
    load_path = get_filepath_str(self._get_load_path(), self._protocol)

    with self._fs.open(load_path, **self._fs_open_args_load) as fs_file:
        return Parser(**self._parser_args).parse(fs_file, **self._load_args)

save

save(data)
Source code in kedro_datasets/email/message_dataset.py
169
170
171
172
173
174
175
def save(self, data: Message) -> None:
    save_path = get_filepath_str(self._get_save_path(), self._protocol)

    with self._fs.open(save_path, **self._fs_open_args_save) as fs_file:
        Generator(fs_file, **self._generator_args).flatten(data, **self._save_args)

    self._invalidate_cache()