from __future__ import annotations
import datetime
import os
import ssl
import tempfile
import warnings
import dask
from dask.widgets import get_template
__all__ = ("Security",)
if ssl.OPENSSL_VERSION_INFO >= (1, 1, 0, 7):
# The OP_NO_SSL* and OP_NO_TLS* become deprecated in favor of
# 'SSLContext.minimum_version' from Python 3.7 onwards, however
# this attribute is not available unless the ssl module is compiled
# with OpenSSL 1.1.0g or newer.
# these _set_mimimun_version and _set_maximum_version depend on the validation
# already performed in `Security._set_tls_version_field`,
# and that they only apply to freshly created ssl.SSLContext instances in
# _get_tls_context
def _set_minimum_version(ctx: ssl.SSLContext, version: ssl.TLSVersion) -> None:
ctx.minimum_version = version
def _set_maximum_version(ctx: ssl.SSLContext, version: ssl.TLSVersion) -> None:
ctx.maximum_version = version
def _set_minimum_version(ctx: ssl.SSLContext, version: ssl.TLSVersion) -> None:
# if the ctx.maximum_version attribute is unsupported then we can infer
# that TLS 1.3 is not supported.
# _set_tls_version_field enforces that version is TLSVersion.TLSv1_2,
# or TLSVersion.TLSv1_3
if version is not ssl.TLSVersion.TLSv1_2:
raise ValueError(f"Unsupported TLS/SSL version: {version!r}")
ctx.options |= (
ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 | ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
def _set_maximum_version(ctx: ssl.SSLContext, version: ssl.TLSVersion) -> None:
# if the ctx.maximum_version attribute is unsupported then we can infer
# that TLSv1_3 is not supported.
# _set_tls_version_field enforces that version is TLSVersion.TLSv1_2,
# TLSVersion.TLSv1_3, or None
# _get_tls_context enforces that version is not None
if version is not ssl.TLSVersion.TLSv1_2:
raise ValueError(f"Unsupported TLS/SSL version: {version!r}")
[docs]class Security:
"""Security configuration for a Dask cluster.
Default values are loaded from Dask's configuration files, and can be
overridden in the constructor.
require_encryption : bool, optional
Whether TLS encryption is required for all connections.
tls_ca_file : str, optional
Path to a CA certificate file encoded in PEM format.
tls_ciphers : str, optional
An OpenSSL cipher string of allowed ciphers. If not provided, the
system defaults will be used.
tls_min_version : ssl.TLSVersion, optional
The minimum TLS version to support. Defaults to TLS 1.2.
tls_max_version : ssl.TLSVersion, optional
The maximum TLS version to support. Defaults to the maximum version
tls_client_cert : str, optional
Path to a certificate file for the client, encoded in PEM format.
tls_client_key : str, optional
Path to a key file for the client, encoded in PEM format.
Alternatively, the key may be appended to the cert file, and this
parameter be omitted.
tls_scheduler_cert : str, optional
Path to a certificate file for the scheduler, encoded in PEM format.
tls_scheduler_key : str, optional
Path to a key file for the scheduler, encoded in PEM format.
Alternatively, the key may be appended to the cert file, and this
parameter be omitted.
tls_worker_cert : str, optional
Path to a certificate file for a worker, encoded in PEM format.
tls_worker_key : str, optional
Path to a key file for a worker, encoded in PEM format.
Alternatively, the key may be appended to the cert file, and this
parameter be omitted.
extra_conn_args : mapping, optional
Mapping with keyword arguments to pass down to connections.
__slots__ = (
def __init__(self, require_encryption=None, **kwargs):
if ssl.OPENSSL_VERSION_INFO < (1, 1, 1):
f"support for {ssl.OPENSSL_VERSION} is deprecated,"
" and will be removed in a future release",
extra = set(kwargs).difference(self.__slots__)
if extra:
raise TypeError("Unknown parameters: %r" % sorted(extra))
self.extra_conn_args = kwargs.pop("extra_conn_args", {})
if require_encryption is None:
require_encryption = dask.config.get("distributed.comm.require-encryption")
if require_encryption is None:
require_encryption = bool(kwargs)
self.require_encryption = require_encryption
self._set_field(kwargs, "tls_ciphers", "distributed.comm.tls.ciphers")
self._set_field(kwargs, "tls_ca_file", "")
self._set_field(kwargs, "tls_client_key", "distributed.comm.tls.client.key")
self._set_field(kwargs, "tls_client_cert", "distributed.comm.tls.client.cert")
kwargs, "tls_scheduler_key", "distributed.comm.tls.scheduler.key"
kwargs, "tls_scheduler_cert", "distributed.comm.tls.scheduler.cert"
self._set_field(kwargs, "tls_worker_key", "distributed.comm.tls.worker.key")
self._set_field(kwargs, "tls_worker_cert", "distributed.comm.tls.worker.cert")
[docs] @classmethod
def temporary(cls, **kwargs):
"""Create a new temporary Security object.
This creates a new self-signed key/cert pair suitable for securing
communication for all roles in a Dask cluster. These keys/certs exist
only in memory, and are stored in this object.
This method requires the library ``cryptography`` be installed.
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
except ImportError:
raise ImportError(
"Using `Security.temporary` requires `cryptography`, please "
"install it using either pip or conda"
key = rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend()
key_contents = key.private_bytes(
dask_internal = x509.Name(
[x509.NameAttribute(NameOID.COMMON_NAME, "dask-internal")]
altnames = x509.SubjectAlternativeName([x509.DNSName("dask-internal")])
now =
cert = (
.add_extension(altnames, critical=False)
.not_valid_after(now + datetime.timedelta(days=365))
.sign(key, hashes.SHA256(), default_backend())
cert_contents = cert.public_bytes(serialization.Encoding.PEM).decode()
return cls(
def _set_field(self, kwargs, field, config_name):
if field in kwargs:
val = kwargs[field]
val = dask.config.get(config_name)
setattr(self, field, val)
def _set_tls_version_field(self, kwargs, field, config_name, default=None):
if field in kwargs:
val = kwargs[field]
valid = {None, ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
if val not in valid:
raise ValueError(
f"{field}={val!r} is not supported, expected one of {list(valid)}"
if val is None:
val = default
valid = {
None: default,
1.2: ssl.TLSVersion.TLSv1_2,
1.3: ssl.TLSVersion.TLSv1_3,
val = dask.config.get(config_name)
if val in valid:
val = valid[val]
raise ValueError(
f"{config_name}={val!r} is not supported, expected one of {list(valid)}"
setattr(self, field, val)
def _attr_to_dict(self):
keys = sorted(self.__slots__)
attr = {}
for k in keys:
val = getattr(self, k)
if val is not None:
if isinstance(val, str) and "\n" in val:
attr[k] = "Temporary (In-memory)"
elif isinstance(val, str):
attr[k] = f"Local ({os.path.abspath(val)})"
attr[k] = val
return attr
def __repr__(self):
attr = self._attr_to_dict()
return (
+ ", ".join(f"{key}={value}" for key, value in attr.items())
+ ")"
def _repr_html_(self):
return get_template("security.html.j2").render(security=self._attr_to_dict())
[docs] def get_tls_config_for_role(self, role):
Return the TLS configuration for the given role, as a flat dict.
if role not in {"client", "scheduler", "worker"}:
raise ValueError(f"unknown role {role!r}")
return {
"ca_file": self.tls_ca_file,
"ciphers": self.tls_ciphers,
"cert": getattr(self, "tls_%s_cert" % role),
"key": getattr(self, "tls_%s_key" % role),
def _get_tls_context(self, tls, purpose):
if tls.get("ca_file") and tls.get("cert"):
ca = tls["ca_file"]
cert_path = cert = tls["cert"]
key_path = key = tls.get("key")
if "\n" in ca:
ctx = ssl.create_default_context(purpose=purpose, cadata=ca)
ctx = ssl.create_default_context(purpose=purpose, cafile=ca)
# the _set_tls_version_field method enforces that
# self.tls_min_version is TLSv1_2, or TLSv1_3
_set_minimum_version(ctx, self.tls_min_version)
if self.tls_max_version is not None:
_set_maximum_version(ctx, self.tls_max_version)
cert_in_memory = "\n" in cert
key_in_memory = key is not None and "\n" in key
if cert_in_memory or key_in_memory:
with tempfile.TemporaryDirectory() as tempdir:
if cert_in_memory:
cert_path = os.path.join(tempdir, "dask.crt")
with open(cert_path, "w") as f:
if key_in_memory:
key_path = os.path.join(tempdir, "dask.pem")
with open(key_path, "w") as f:
ctx.load_cert_chain(cert_path, key_path)
ctx.load_cert_chain(cert_path, key_path)
# Bidirectional authentication
ctx.verify_mode = ssl.CERT_REQUIRED
# We expect a dedicated CA for the cluster and people using
# IP addresses rather than hostnames
ctx.check_hostname = False
ctx.verify_flags &= ~ssl.VERIFY_X509_STRICT
if tls.get("ciphers"):
return ctx
[docs] def get_connection_args(self, role):
Get the *connection_args* argument for a connect() call with
the given *role*.
tls = self.get_tls_config_for_role(role)
return {
"ssl_context": self._get_tls_context(tls, ssl.Purpose.SERVER_AUTH),
"require_encryption": self.require_encryption,
"extra_conn_args": self.extra_conn_args,
[docs] def get_listen_args(self, role):
Get the *connection_args* argument for a listen() call with
the given *role*.
tls = self.get_tls_config_for_role(role)
return {
"ssl_context": self._get_tls_context(tls, ssl.Purpose.CLIENT_AUTH),
"require_encryption": self.require_encryption,