#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Azure Blob Storage client.
"""
# pytype: skip-file
import errno
import io
import logging
import os
import re
import tempfile
import time
from apache_beam.internal.azure import auth
from apache_beam.io.filesystemio import Downloader
from apache_beam.io.filesystemio import DownloaderStream
from apache_beam.io.filesystemio import Uploader
from apache_beam.io.filesystemio import UploaderStream
from apache_beam.options.pipeline_options import AzureOptions
from apache_beam.utils import retry
from apache_beam.utils.annotations import deprecated
_LOGGER = logging.getLogger(__name__)
try:
# pylint: disable=wrong-import-order, wrong-import-position
# pylint: disable=ungrouped-imports
from azure.core.exceptions import ResourceNotFoundError
from azure.storage.blob import (
BlobServiceClient,
ContentSettings,
)
AZURE_DEPS_INSTALLED = True
except ImportError:
AZURE_DEPS_INSTALLED = False
DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024
MAX_BATCH_OPERATION_SIZE = 100
[docs]
def parse_azfs_path(azfs_path, blob_optional=False, get_account=False):
"""Return the storage account, the container and
blob names of the given azfs:// path.
"""
match = re.match(
'^azfs://([a-z0-9]{3,24})/([a-z0-9](?![a-z0-9-]*--[a-z0-9-]*)'
'[a-z0-9-]{1,61}[a-z0-9])/(.*)$',
azfs_path)
if match is None or (match.group(3) == '' and not blob_optional):
raise ValueError(
'Azure Blob Storage path must be in the form '
'azfs://<storage-account>/<container>/<path>.')
result = None
if get_account:
result = match.group(1), match.group(2), match.group(3)
else:
result = match.group(2), match.group(3)
return result
[docs]
def get_azfs_url(storage_account, container, blob=''):
"""Returns the url in the form of
https://account.blob.core.windows.net/container/blob-name
"""
return 'https://' + storage_account + '.blob.core.windows.net/' + \
container + '/' + blob
[docs]
class Blob():
"""A Blob in Azure Blob Storage."""
def __init__(self, etag, name, last_updated, size, mime_type):
self.etag = etag
self.name = name
self.last_updated = last_updated
self.size = size
self.mime_type = mime_type
[docs]
class BlobStorageIOError(IOError, retry.PermanentException):
"""Blob Strorage IO error that should not be retried."""
pass
[docs]
class BlobStorageError(Exception):
"""Blob Storage client error."""
def __init__(self, message=None, code=None):
self.message = message
self.code = code
[docs]
class BlobStorageIO(object):
"""Azure Blob Storage I/O client."""
def __init__(self, client=None, pipeline_options=None):
if client is None:
azure_options = pipeline_options.view_as(AzureOptions)
connect_str = azure_options.azure_connection_string or \
os.getenv('AZURE_STORAGE_CONNECTION_STRING')
if connect_str:
self.client = BlobServiceClient.from_connection_string(
conn_str=connect_str)
else:
credential = auth.get_service_credentials(pipeline_options)
self.client = BlobServiceClient(
account_url=azure_options.blob_service_endpoint,
credential=credential)
else:
self.client = client
if not AZURE_DEPS_INSTALLED:
raise RuntimeError('Azure dependencies are not installed. Unable to run.')
[docs]
def open(
self,
filename,
mode='r',
read_buffer_size=DEFAULT_READ_BUFFER_SIZE,
mime_type='application/octet-stream'):
"""Open an Azure Blob Storage file path for reading or writing.
Args:
filename (str): Azure Blob Storage file path in the form
``azfs://<storage-account>/<container>/<path>``.
mode (str): ``'r'`` for reading or ``'w'`` for writing.
read_buffer_size (int): Buffer size to use during read operations.
mime_type (str): Mime type to set for write operations.
Returns:
Azure Blob Storage file object.
Raises:
ValueError: Invalid open file mode.
"""
if mode == 'r' or mode == 'rb':
downloader = BlobStorageDownloader(
self.client, filename, buffer_size=read_buffer_size)
return io.BufferedReader(
DownloaderStream(
downloader, read_buffer_size=read_buffer_size, mode=mode),
buffer_size=read_buffer_size)
elif mode == 'w' or mode == 'wb':
uploader = BlobStorageUploader(self.client, filename, mime_type)
return io.BufferedWriter(
UploaderStream(uploader, mode=mode), buffer_size=128 * 1024)
else:
raise ValueError('Invalid file open mode: %s.' % mode)
[docs]
@retry.with_exponential_backoff(
retry_filter=retry.retry_on_beam_io_error_filter)
def copy(self, src, dest):
"""Copies a single Azure Blob Storage blob from src to dest.
Args:
src: Blob Storage file path pattern in the form
azfs://<storage-account>/<container>/[name].
dest: Blob Storage file path pattern in the form
azfs://<storage-account>/<container>/[name].
Raises:
TimeoutError: on timeout.
"""
src_storage_account, src_container, src_blob = parse_azfs_path(
src, get_account=True)
dest_container, dest_blob = parse_azfs_path(dest)
source_blob = get_azfs_url(src_storage_account, src_container, src_blob)
copied_blob = self.client.get_blob_client(dest_container, dest_blob)
try:
copied_blob.start_copy_from_url(source_blob)
except ResourceNotFoundError as e:
message = e.reason
code = e.status_code
raise BlobStorageError(message, code)
# We intentionally do not decorate this method with a retry, since the
# underlying copy operation is already an idempotent operation protected
# by retry decorators.
[docs]
def copy_tree(self, src, dest):
"""Renames the given Azure Blob storage directory and its contents
recursively from src to dest.
Args:
src: Blob Storage file path pattern in the form
azfs://<storage-account>/<container>/[name].
dest: Blob Storage file path pattern in the form
azfs://<storage-account>/<container>/[name].
Returns:
List of tuples of (src, dest, exception) where exception is None if the
operation succeeded or the relevant exception if the operation failed.
"""
assert src.endswith('/')
assert dest.endswith('/')
results = []
for entry in self.list_prefix(src):
rel_path = entry[len(src):]
try:
self.copy(entry, dest + rel_path)
results.append((entry, dest + rel_path, None))
except BlobStorageError as e:
results.append((entry, dest + rel_path, e))
return results
# We intentionally do not decorate this method with a retry, since the
# underlying copy operation is already an idempotent operation protected
# by retry decorators.
[docs]
def copy_paths(self, src_dest_pairs):
"""Copies the given Azure Blob Storage blobs from src to dest. This can
handle directory or file paths.
Args:
src_dest_pairs: List of (src, dest) tuples of
azfs://<storage-account>/<container>/[name] file paths
to copy from src to dest.
Returns:
List of tuples of (src, dest, exception) in the same order as the
src_dest_pairs argument, where exception is None if the operation
succeeded or the relevant exception if the operation failed.
"""
if not src_dest_pairs:
return []
results = []
for src_path, dest_path in src_dest_pairs:
# Case 1. They are directories.
if src_path.endswith('/') and dest_path.endswith('/'):
try:
results += self.copy_tree(src_path, dest_path)
except BlobStorageError as e:
results.append((src_path, dest_path, e))
# Case 2. They are individual blobs.
elif not src_path.endswith('/') and not dest_path.endswith('/'):
try:
self.copy(src_path, dest_path)
results.append((src_path, dest_path, None))
except BlobStorageError as e:
results.append((src_path, dest_path, e))
# Mismatched paths (one directory, one non-directory) get an error.
else:
e = BlobStorageError(
"Unable to copy mismatched paths" +
"(directory, non-directory): %s, %s" % (src_path, dest_path),
400)
results.append((src_path, dest_path, e))
return results
# We intentionally do not decorate this method with a retry, since the
# underlying copy and delete operations are already idempotent operations
# protected by retry decorators.
[docs]
def rename(self, src, dest):
"""Renames the given Azure Blob Storage blob from src to dest.
Args:
src: Blob Storage file path pattern in the form
azfs://<storage-account>/<container>/[name].
dest: Blob Storage file path pattern in the form
azfs://<storage-account>/<container>/[name].
"""
self.copy(src, dest)
self.delete(src)
# We intentionally do not decorate this method with a retry, since the
# underlying copy and delete operations are already idempotent operations
# protected by retry decorators.
[docs]
def rename_files(self, src_dest_pairs):
"""Renames the given Azure Blob Storage blobs from src to dest.
Args:
src_dest_pairs: List of (src, dest) tuples of
azfs://<storage-account>/<container>/[name]
file paths to rename from src to dest.
Returns: List of tuples of (src, dest, exception) in the same order as the
src_dest_pairs argument, where exception is None if the operation
succeeded or the relevant exception if the operation failed.
"""
if not src_dest_pairs:
return []
for src, dest in src_dest_pairs:
if src.endswith('/') or dest.endswith('/'):
raise ValueError('Unable to rename a directory.')
# Results from copy operation.
copy_results = self.copy_paths(src_dest_pairs)
paths_to_delete = \
[src for (src, _, error) in copy_results if error is None]
# Results from delete operation.
delete_results = self.delete_files(paths_to_delete)
# Get rename file results (list of tuples).
results = []
# Using a dictionary will make the operation faster.
delete_results_dict = {src: error for (src, error) in delete_results}
for src, dest, error in copy_results:
# If there was an error in the copy operation.
if error is not None:
results.append((src, dest, error))
# If there was an error in the delete operation.
elif delete_results_dict[src] is not None:
results.append((src, dest, delete_results_dict[src]))
# If there was no error in the operations.
else:
results.append((src, dest, None))
return results
[docs]
def exists(self, path):
"""Returns whether the given Azure Blob Storage blob exists.
Args:
path: Azure Blob Storage file path pattern in the form
azfs://<storage-account>/<container>/[name].
"""
try:
self._blob_properties(path)
return True
except ResourceNotFoundError as e:
if e.status_code == 404:
# HTTP 404 indicates that the file did not exist.
return False
else:
# We re-raise all other exceptions.
raise
[docs]
def size(self, path):
"""Returns the size of a single Blob Storage blob.
This method does not perform glob expansion. Hence the
given path must be for a single Blob Storage blob.
Returns: size of the Blob Storage blob in bytes.
"""
return self._blob_properties(path).size
[docs]
def last_updated(self, path):
"""Returns the last updated epoch time of a single
Azure Blob Storage blob.
This method does not perform glob expansion. Hence the
given path must be for a single Azure Blob Storage blob.
Returns: last updated time of the Azure Blob Storage blob
in seconds.
"""
return self._updated_to_seconds(self._blob_properties(path).last_modified)
[docs]
def checksum(self, path):
"""Looks up the checksum of an Azure Blob Storage blob.
Args:
path: Azure Blob Storage file path pattern in the form
azfs://<storage-account>/<container>/[name].
"""
return self._blob_properties(path).etag
def _status(self, path):
"""For internal use only; no backwards-compatibility guarantees.
Returns supported fields (checksum, last_updated, size) of a single object
as a dict at once.
This method does not perform glob expansion. Hence the given path must be
for a single blob property.
Returns: dict of fields of the blob property.
"""
properties = self._blob_properties(path)
file_status = {}
if hasattr(properties, 'etag'):
file_status['checksum'] = properties.etag
if hasattr(properties, 'last_modified'):
file_status['last_updated'] = self._updated_to_seconds(
properties.last_modified)
if hasattr(properties, 'size'):
file_status['size'] = properties.size
return file_status
@retry.with_exponential_backoff(
retry_filter=retry.retry_on_beam_io_error_filter)
def _blob_properties(self, path):
"""Returns a blob properties object for the given path
This method does not perform glob expansion. Hence the given path must be
for a single blob properties object.
Returns: blob properties.
"""
container, blob = parse_azfs_path(path)
blob_to_check = self.client.get_blob_client(container, blob)
try:
properties = blob_to_check.get_blob_properties()
except ResourceNotFoundError as e:
message = e.reason
code = e.status_code
raise BlobStorageError(message, code)
return properties
@staticmethod
def _updated_to_seconds(updated):
"""Helper function transform the updated field of response to seconds"""
return (
time.mktime(updated.timetuple()) - time.timezone +
updated.microsecond / 1000000.0)
[docs]
@retry.with_exponential_backoff(
retry_filter=retry.retry_on_beam_io_error_filter)
def delete(self, path):
"""Deletes a single blob at the given Azure Blob Storage path.
Args:
path: Azure Blob Storage file path pattern in the form
azfs://<storage-account>/<container>/[name].
"""
container, blob = parse_azfs_path(path)
blob_to_delete = self.client.get_blob_client(container, blob)
try:
blob_to_delete.delete_blob()
except ResourceNotFoundError as e:
if e.status_code == 404:
# Return success when the file doesn't exist anymore for idempotency.
return
else:
logging.error('HTTP error while deleting file %s', path)
raise e
# We intentionally do not decorate this method with a retry, since the
# underlying copy and delete operations are already idempotent operations
# protected by retry decorators.
[docs]
def delete_paths(self, paths):
"""Deletes the given Azure Blob Storage blobs from src to dest.
This can handle directory or file paths.
Args:
paths: list of Azure Blob Storage paths in the form
azfs://<storage-account>/<container>/[name] that give the
file blobs to be deleted.
Returns:
List of tuples of (src, dest, exception) in the same order as the
src_dest_pairs argument, where exception is None if the operation
succeeded or the relevant exception if the operation failed.
"""
directories, blobs = [], []
# Retrieve directories and not directories.
for path in paths:
if path.endswith('/'):
directories.append(path)
else:
blobs.append(path)
results = {}
for directory in directories:
directory_result = dict(self.delete_tree(directory))
results.update(directory_result)
blobs_results = dict(self.delete_files(blobs))
results.update(blobs_results)
return results
# We intentionally do not decorate this method with a retry, since the
# underlying copy and delete operations are already idempotent operations
# protected by retry decorators.
[docs]
def delete_tree(self, root):
"""Deletes all blobs under the given Azure BlobStorage virtual
directory.
Args:
path: Azure Blob Storage file path pattern in the form
azfs://<storage-account>/<container>/[name]
(ending with a "/").
Returns:
List of tuples of (path, exception), where each path is a blob
under the given root. exception is None if the operation succeeded
or the relevant exception if the operation failed.
"""
assert root.endswith('/')
# Get the blob under the root directory.
paths_to_delete = self.list_prefix(root)
return self.delete_files(paths_to_delete)
# We intentionally do not decorate this method with a retry, since the
# underlying copy and delete operations are already idempotent operations
# protected by retry decorators.
[docs]
def delete_files(self, paths):
"""Deletes the given Azure Blob Storage blobs from src to dest.
Args:
paths: list of Azure Blob Storage paths in the form
azfs://<storage-account>/<container>/[name] that give the
file blobs to be deleted.
Returns:
List of tuples of (src, dest, exception) in the same order as the
src_dest_pairs argument, where exception is None if the operation
succeeded or the relevant exception if the operation failed.
"""
if not paths:
return []
# Group blobs into containers.
containers, blobs = zip(*[parse_azfs_path(path, get_account=False) \
for path in paths])
grouped_blobs = {container: [] for container in containers}
# Fill dictionary.
for container, blob in zip(containers, blobs):
grouped_blobs[container].append(blob)
results = {}
# Delete minibatches of blobs for each container.
for container, blobs in grouped_blobs.items():
for i in range(0, len(blobs), MAX_BATCH_OPERATION_SIZE):
blobs_to_delete = blobs[i:i + MAX_BATCH_OPERATION_SIZE]
results.update(self._delete_batch(container, blobs_to_delete))
final_results = \
[(path, results[parse_azfs_path(path, get_account=False)]) \
for path in paths]
return final_results
@retry.with_exponential_backoff(
retry_filter=retry.retry_on_beam_io_error_filter)
def _delete_batch(self, container, blobs):
"""A helper method. Azure Blob Storage Python Client allows batch
deletions for blobs within the same container.
Args:
container: container name.
blobs: list of blobs to be deleted.
Returns:
Dictionary of the form {(container, blob): error}, where error is
None if the operation succeeded.
"""
container_client = self.client.get_container_client(container)
results = {}
for blob in blobs:
try:
response = container_client.delete_blob(blob)
results[(container, blob)] = response
except ResourceNotFoundError as e:
results[(container, blob)] = e.status_code
return results
[docs]
@deprecated(since='2.45.0', current='list_files')
def list_prefix(self, path, with_metadata=False):
"""Lists files matching the prefix.
Args:
path: Azure Blob Storage file path pattern in the form
azfs://<storage-account>/<container>/[name].
with_metadata: Experimental. Specify whether returns file metadata.
Returns:
If ``with_metadata`` is False: dict of file name -> size; if
``with_metadata`` is True: dict of file name -> tuple(size, timestamp).
"""
file_info = {}
for file_metadata in self.list_files(path, with_metadata):
file_info[file_metadata[0]] = file_metadata[1]
return file_info
[docs]
def list_files(self, path, with_metadata=False):
"""Lists files matching the prefix.
Args:
path: Azure Blob Storage file path pattern in the form
azfs://<storage-account>/<container>/[name].
with_metadata: Experimental. Specify whether returns file metadata.
Returns:
If ``with_metadata`` is False: generator of tuple(file name, size); if
``with_metadata`` is True: generator of
tuple(file name, tuple(size, timestamp)).
"""
storage_account, container, blob = parse_azfs_path(
path, blob_optional=True, get_account=True)
file_info = set()
counter = 0
start_time = time.time()
if with_metadata:
logging.debug("Starting the file information of the input")
else:
logging.debug("Starting the size estimation of the input")
container_client = self.client.get_container_client(container)
response = retry.with_exponential_backoff(
retry_filter=retry.retry_on_beam_io_error_filter)(
container_client.list_blobs)(
name_starts_with=blob)
for item in response:
file_name = "azfs://%s/%s/%s" % (storage_account, container, item.name)
if file_name not in file_info:
file_info.add(file_name)
counter += 1
if counter % 10000 == 0:
if with_metadata:
logging.info(
"Finished computing file information of: %s files",
len(file_info))
else:
logging.info("Finished computing size of: %s files", len(file_info))
if with_metadata:
yield file_name, (
item.size, self._updated_to_seconds(item.last_modified))
else:
yield file_name, item.size
logging.log(
# do not spam logs when list_prefix is likely used to check empty folder
logging.INFO if counter > 0 else logging.DEBUG,
"Finished listing %s files in %s seconds.",
counter,
time.time() - start_time)
[docs]
class BlobStorageDownloader(Downloader):
def __init__(self, client, path, buffer_size):
self._client = client
self._path = path
self._container, self._blob = parse_azfs_path(path)
self._buffer_size = buffer_size
self._blob_to_download = self._client.get_blob_client(
self._container, self._blob)
try:
properties = self._get_object_properties()
except ResourceNotFoundError as http_error:
if http_error.status_code == 404:
raise IOError(errno.ENOENT, 'Not found: %s' % self._path)
else:
_LOGGER.error(
'HTTP error while requesting file %s: %s', self._path, http_error)
raise
self._size = properties.size
@retry.with_exponential_backoff(
retry_filter=retry.retry_on_beam_io_error_filter)
def _get_object_properties(self):
return self._blob_to_download.get_blob_properties()
@property
def size(self):
return self._size
[docs]
def get_range(self, start, end):
# Download_blob first parameter is offset and second is length (exclusive).
blob_data = self._blob_to_download.download_blob(start, end - start)
# Returns the content as bytes.
return blob_data.readall()
[docs]
class BlobStorageUploader(Uploader):
def __init__(self, client, path, mime_type='application/octet-stream'):
self._client = client
self._path = path
self._container, self._blob = parse_azfs_path(path)
self._content_settings = ContentSettings(mime_type)
self._blob_to_upload = self._client.get_blob_client(
self._container, self._blob)
# Temporary file.
self._temporary_file = tempfile.NamedTemporaryFile()
[docs]
def put(self, data):
self._temporary_file.write(data.tobytes())
[docs]
def finish(self):
self._temporary_file.seek(0)
# The temporary file is deleted immediately after the operation.
with open(self._temporary_file.name, "rb") as f:
self._blob_to_upload.upload_blob(
f.read(), overwrite=True, content_settings=self._content_settings)