#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
""":class:`~apache_beam.io.filesystem.FileSystem` implementation for accessing
Hadoop Distributed File System files."""
# pytype: skip-file
import io
import logging
import posixpath
import re
from typing import BinaryIO # pylint: disable=unused-import
import hdfs
from apache_beam.io import filesystemio
from apache_beam.io.filesystem import BeamIOError
from apache_beam.io.filesystem import CompressedFile
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.filesystem import FileMetadata
from apache_beam.io.filesystem import FileSystem
from apache_beam.options.pipeline_options import HadoopFileSystemOptions
from apache_beam.options.pipeline_options import PipelineOptions
__all__ = ['HadoopFileSystem']
_HDFS_PREFIX = 'hdfs:/'
_URL_RE = re.compile(r'^' + _HDFS_PREFIX + r'(/.*)')
_FULL_URL_RE = re.compile(r'^' + _HDFS_PREFIX + r'/([^/]+)(/.*)*')
_COPY_BUFFER_SIZE = 2**16
_DEFAULT_BUFFER_SIZE = 20 * 1024 * 1024
# WebHDFS FileChecksum property constants.
_FILE_CHECKSUM_ALGORITHM = 'algorithm'
_FILE_CHECKSUM_BYTES = 'bytes'
_FILE_CHECKSUM_LENGTH = 'length'
# WebHDFS FileStatus property constants.
_FILE_STATUS_LENGTH = 'length'
_FILE_STATUS_UPDATED = 'modificationTime'
_FILE_STATUS_PATH_SUFFIX = 'pathSuffix'
_FILE_STATUS_TYPE = 'type'
_FILE_STATUS_TYPE_DIRECTORY = 'DIRECTORY'
_FILE_STATUS_TYPE_FILE = 'FILE'
_LOGGER = logging.getLogger(__name__)
class HdfsDownloader(filesystemio.Downloader):
def __init__(self, hdfs_client, path):
self._hdfs_client = hdfs_client
self._path = path
self._size = self._hdfs_client.status(path)[_FILE_STATUS_LENGTH]
@property
def size(self):
return self._size
def get_range(self, start, end):
with self._hdfs_client.read(self._path, offset=start,
length=end - start) as reader:
return reader.read()
class HdfsUploader(filesystemio.Uploader):
def __init__(self, hdfs_client, path):
self._hdfs_client = hdfs_client
if self._hdfs_client.status(path, strict=False) is not None:
raise BeamIOError('Path already exists: %s' % path)
self._handle_context = self._hdfs_client.write(path)
self._handle = self._handle_context.__enter__()
def put(self, data):
# hdfs uses an async writer which first add data to a queue. To avoid buffer
# gets reused upstream a deepcopy is required here.
self._handle.write(bytes(data))
def finish(self):
self._handle.__exit__(None, None, None)
self._handle = None
self._handle_context = None
[docs]
class HadoopFileSystem(FileSystem):
"""``FileSystem`` implementation that supports HDFS.
URL arguments to methods expect strings starting with ``hdfs://``.
"""
def __init__(self, pipeline_options):
"""Initializes a connection to HDFS.
Connection configuration is done by passing pipeline options.
See :class:`~apache_beam.options.pipeline_options.HadoopFileSystemOptions`.
"""
super().__init__(pipeline_options)
logging.getLogger('hdfs.client').setLevel(logging.WARN)
if pipeline_options is None:
raise ValueError('pipeline_options is not set')
if isinstance(pipeline_options, PipelineOptions):
hdfs_options = pipeline_options.view_as(HadoopFileSystemOptions)
hdfs_host = hdfs_options.hdfs_host
hdfs_port = hdfs_options.hdfs_port
hdfs_user = hdfs_options.hdfs_user
self._full_urls = hdfs_options.hdfs_full_urls
else:
hdfs_host = pipeline_options.get('hdfs_host')
hdfs_port = pipeline_options.get('hdfs_port')
hdfs_user = pipeline_options.get('hdfs_user')
self._full_urls = pipeline_options.get('hdfs_full_urls', False)
if hdfs_host is None:
raise ValueError('hdfs_host is not set')
if hdfs_port is None:
raise ValueError('hdfs_port is not set')
if hdfs_user is None:
raise ValueError('hdfs_user is not set')
if not isinstance(self._full_urls, bool):
raise ValueError(
'hdfs_full_urls should be bool, got: %s', self._full_urls)
self._hdfs_client = hdfs.InsecureClient(
'http://%s:%s' % (hdfs_host, str(hdfs_port)), user=hdfs_user)
[docs]
@classmethod
def scheme(cls):
return 'hdfs'
def _parse_url(self, url):
"""Verifies that url begins with hdfs:// prefix, strips it and adds a
leading /.
Parsing behavior is determined by HadoopFileSystemOptions.hdfs_full_urls.
Args:
url: (str) A URL in the form hdfs://path/...
or in the form hdfs://server/path/...
Raises:
ValueError if the URL doesn't match the expect format.
Returns:
(str, str) If using hdfs_full_urls, for an input of
'hdfs://server/path/...' will return (server, '/path/...').
Otherwise, for an input of 'hdfs://path/...', will return
('', '/path/...').
"""
if not self._full_urls:
m = _URL_RE.match(url)
if m is None:
raise ValueError('Could not parse url: %s' % url)
return '', m.group(1)
else:
m = _FULL_URL_RE.match(url)
if m is None:
raise ValueError('Could not parse url: %s' % url)
return m.group(1), m.group(2) or '/'
[docs]
def join(self, base_url, *paths):
"""Join two or more pathname components.
Args:
base_url: string path of the first component of the path.
Must start with hdfs://.
paths: path components to be added
Returns:
Full url after combining all the passed components.
"""
server, basepath = self._parse_url(base_url)
return _HDFS_PREFIX + self._join(server, basepath, *paths)
def _join(self, server, basepath, *paths):
res = posixpath.join(basepath, *paths)
if server:
server = '/' + server
return server + res
[docs]
def split(self, url):
server, rel_path = self._parse_url(url)
if server:
server = '/' + server
head, tail = posixpath.split(rel_path)
return _HDFS_PREFIX + server + head, tail
[docs]
def mkdirs(self, url):
_, path = self._parse_url(url)
if self._exists(path):
raise BeamIOError('Path already exists: %s' % path)
return self._mkdirs(path)
def _mkdirs(self, path):
self._hdfs_client.makedirs(path)
[docs]
def has_dirs(self):
return True
def _list(self, url):
try:
server, path = self._parse_url(url)
for res in self._hdfs_client.list(path, status=True):
yield FileMetadata(
_HDFS_PREFIX + self._join(server, path, res[0]),
res[1][_FILE_STATUS_LENGTH],
res[1][_FILE_STATUS_UPDATED] / 1000.0)
except Exception as e: # pylint: disable=broad-except
raise BeamIOError('List operation failed', {url: e})
@staticmethod
def _add_compression(stream, path, mime_type, compression_type):
if mime_type != 'application/octet-stream':
_LOGGER.warning(
'Mime types are not supported. Got non-default mime_type:'
' %s',
mime_type)
if compression_type == CompressionTypes.AUTO:
compression_type = CompressionTypes.detect_compression_type(path)
if compression_type != CompressionTypes.UNCOMPRESSED:
return CompressedFile(stream)
return stream
[docs]
def create(
self,
url,
mime_type='application/octet-stream',
compression_type=CompressionTypes.AUTO) -> BinaryIO:
"""
Returns:
A Python File-like object.
"""
_, path = self._parse_url(url)
return self._create(path, mime_type, compression_type)
def _create(
self,
path,
mime_type='application/octet-stream',
compression_type=CompressionTypes.AUTO):
stream = io.BufferedWriter(
filesystemio.UploaderStream(HdfsUploader(self._hdfs_client, path)),
buffer_size=_DEFAULT_BUFFER_SIZE)
return self._add_compression(stream, path, mime_type, compression_type)
[docs]
def open(
self,
url,
mime_type='application/octet-stream',
compression_type=CompressionTypes.AUTO) -> BinaryIO:
"""
Returns:
A Python File-like object.
"""
_, path = self._parse_url(url)
return self._open(path, mime_type, compression_type)
def _open(
self,
path,
mime_type='application/octet-stream',
compression_type=CompressionTypes.AUTO):
stream = io.BufferedReader(
filesystemio.DownloaderStream(HdfsDownloader(self._hdfs_client, path)),
buffer_size=_DEFAULT_BUFFER_SIZE)
return self._add_compression(stream, path, mime_type, compression_type)
[docs]
def copy(self, source_file_names, destination_file_names):
"""
It is an error if any file to copy already exists at the destination.
Raises ``BeamIOError`` if any error occurred.
Args:
source_file_names: iterable of URLs.
destination_file_names: iterable of URLs.
"""
if len(source_file_names) != len(destination_file_names):
raise BeamIOError(
'source_file_names and destination_file_names should '
'be equal in length: %d != %d' %
(len(source_file_names), len(destination_file_names)))
def _copy_file(source, destination):
with self._open(source) as f1:
with self._create(destination) as f2:
while True:
buf = f1.read(_COPY_BUFFER_SIZE)
if not buf:
break
f2.write(buf)
def _copy_path(source, destination):
"""Recursively copy the file tree from the source to the destination."""
if self._hdfs_client.status(
source)[_FILE_STATUS_TYPE] != _FILE_STATUS_TYPE_DIRECTORY:
_copy_file(source, destination)
return
for path, dirs, files in self._hdfs_client.walk(source):
for dir in dirs:
new_dir = self._join('', destination, dir)
if not self._exists(new_dir):
self._mkdirs(new_dir)
rel_path = posixpath.relpath(path, source)
if rel_path == '.':
rel_path = ''
for file in files:
_copy_file(
self._join('', path, file),
self._join('', destination, rel_path, file))
exceptions = {}
for source, destination in zip(source_file_names, destination_file_names):
try:
_, rel_source = self._parse_url(source)
_, rel_destination = self._parse_url(destination)
_copy_path(rel_source, rel_destination)
except Exception as e: # pylint: disable=broad-except
exceptions[(source, destination)] = e
if exceptions:
raise BeamIOError('Copy operation failed', exceptions)
[docs]
def rename(self, source_file_names, destination_file_names):
exceptions = {}
for source, destination in zip(source_file_names, destination_file_names):
try:
_, rel_source = self._parse_url(source)
_, rel_destination = self._parse_url(destination)
try:
self._hdfs_client.rename(rel_source, rel_destination)
except hdfs.HdfsError as e:
raise BeamIOError(
'libhdfs error in renaming %s to %s' % (source, destination), e)
except Exception as e: # pylint: disable=broad-except
exceptions[(source, destination)] = e
if exceptions:
raise BeamIOError('Rename operation failed', exceptions)
[docs]
def exists(self, url: str) -> bool:
"""Checks existence of url in HDFS.
Args:
url: String in the form hdfs://...
Returns:
True if url exists as a file or directory in HDFS.
"""
_, path = self._parse_url(url)
return self._exists(path)
def _exists(self, path):
"""Returns True if path exists as a file or directory in HDFS.
Args:
path: String in the form /...
"""
return self._hdfs_client.status(path, strict=False) is not None
[docs]
def size(self, url):
"""Fetches file size for a URL.
Returns:
int size of path according to the FileSystem.
Raises:
``BeamIOError``: if url doesn't exist.
"""
return self.metadata(url).size_in_bytes
[docs]
def last_updated(self, url):
"""Fetches last updated time for a URL.
Args:
url: string url of file.
Returns: float UNIX Epoch time
Raises:
``BeamIOError``: if path doesn't exist.
"""
return self.metadata(url).last_updated_in_seconds
[docs]
def checksum(self, url):
"""Fetches a checksum description for a URL.
Returns:
String describing the checksum.
Raises:
``BeamIOError``: if url doesn't exist.
"""
_, path = self._parse_url(url)
file_checksum = self._hdfs_client.checksum(path)
return '%s-%d-%s' % (
file_checksum[_FILE_CHECKSUM_ALGORITHM],
file_checksum[_FILE_CHECKSUM_LENGTH],
file_checksum[_FILE_CHECKSUM_BYTES],
)
[docs]
def delete(self, urls):
exceptions = {}
for url in urls:
try:
_, path = self._parse_url(url)
self._hdfs_client.delete(path, recursive=True)
except Exception as e: # pylint: disable=broad-except
exceptions[url] = e
if exceptions:
raise BeamIOError("Delete operation failed", exceptions)