#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Local File system implementation for accessing files on disk."""
# pytype: skip-file
import io
import os
import shutil
from typing import BinaryIO # pylint: disable=unused-import
from apache_beam.io.filesystem import BeamIOError
from apache_beam.io.filesystem import CompressedFile
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.filesystem import FileMetadata
from apache_beam.io.filesystem import FileSystem
__all__ = ['LocalFileSystem']
[docs]
class LocalFileSystem(FileSystem):
"""A Local ``FileSystem`` implementation for accessing files on disk.
"""
[docs]
@classmethod
def scheme(cls):
"""URI scheme for the FileSystem
"""
return None
[docs]
def join(self, basepath, *paths):
"""Join two or more pathname components for the filesystem
Args:
basepath: string path of the first component of the path
paths: path components to be added
Returns: full path after combining all the passed components
"""
return os.path.join(basepath, *paths)
[docs]
def split(self, path):
"""Splits the given path into two parts.
Splits the path into a pair (head, tail) such that tail contains the last
component of the path and head contains everything up to that.
Args:
path: path as a string
Returns:
a pair of path components as strings.
"""
return os.path.split(os.path.abspath(path))
[docs]
def mkdirs(self, path):
"""Recursively create directories for the provided path.
Args:
path: string path of the directory structure that should be created
Raises:
IOError: if leaf directory already exists.
"""
try:
os.makedirs(path)
except OSError as err:
raise IOError(err)
[docs]
def has_dirs(self):
"""Whether this FileSystem supports directories."""
return True
def _url_dirname(self, url_or_path):
"""Pass through to os.path.dirname.
This version uses os.path instead of posixpath to be compatible with the
host OS.
Args:
url_or_path: A string in the form of /some/path.
"""
return os.path.dirname(url_or_path)
def _list(self, dir_or_prefix):
"""List files in a location.
Listing is non-recursive, for filesystems that support directories.
Args:
dir_or_prefix: (string) A directory or location prefix (for filesystems
that don't have directories).
Returns:
Generator of ``FileMetadata`` objects.
Raises:
``BeamIOError``: if listing fails, but not if no files were found.
"""
if not self.exists(dir_or_prefix):
return
def list_files(root):
for dirpath, _, files in os.walk(root):
for filename in files:
yield self.join(dirpath, filename)
try:
for f in list_files(dir_or_prefix):
try:
yield FileMetadata(f, os.path.getsize(f), os.path.getmtime(f))
except OSError:
# Files may disappear, such as when listing /tmp.
pass
except Exception as e: # pylint: disable=broad-except
raise BeamIOError("List operation failed", {dir_or_prefix: e})
def _path_open(
self,
path,
mode,
mime_type='application/octet-stream',
compression_type=CompressionTypes.AUTO):
"""Helper functions to open a file in the provided mode.
"""
compression_type = FileSystem._get_compression_type(path, compression_type)
raw_file = io.open(path, mode)
if compression_type == CompressionTypes.UNCOMPRESSED:
return raw_file
else:
return CompressedFile(raw_file, compression_type=compression_type)
[docs]
def create(
self,
path,
mime_type='application/octet-stream',
compression_type=CompressionTypes.AUTO) -> BinaryIO:
"""Returns a write channel for the given file path.
Args:
path: string path of the file object to be written to the system
mime_type: MIME type to specify the type of content in the file object
compression_type: Type of compression to be used for this object
Returns: file handle with a close function for the user to use
"""
os.makedirs(os.path.dirname(path), exist_ok=True)
return self._path_open(path, 'wb', mime_type, compression_type)
[docs]
def open(
self,
path,
mime_type='application/octet-stream',
compression_type=CompressionTypes.AUTO) -> BinaryIO:
"""Returns a read channel for the given file path.
Args:
path: string path of the file object to be written to the system
mime_type: MIME type to specify the type of content in the file object
compression_type: Type of compression to be used for this object
Returns: file handle with a close function for the user to use
"""
return self._path_open(path, 'rb', mime_type, compression_type)
[docs]
def copy(self, source_file_names, destination_file_names):
"""Recursively copy the file tree from the source to the destination
Args:
source_file_names: list of source file objects that needs to be copied
destination_file_names: list of destination of the new object
Raises:
``BeamIOError``: if any of the copy operations fail
"""
err_msg = (
"source_file_names and destination_file_names should "
"be equal in length")
assert len(source_file_names) == len(destination_file_names), err_msg
def _copy_path(source, destination):
"""Recursively copy the file tree from the source to the destination
"""
try:
if os.path.exists(destination):
if os.path.isdir(destination):
shutil.rmtree(destination)
else:
os.remove(destination)
if os.path.isdir(source):
shutil.copytree(source, destination)
else:
shutil.copy2(source, destination)
except OSError as err:
raise IOError(err)
exceptions = {}
for source, destination in zip(source_file_names, destination_file_names):
try:
_copy_path(source, destination)
except Exception as e: # pylint: disable=broad-except
exceptions[(source, destination)] = e
if exceptions:
raise BeamIOError("Copy operation failed", exceptions)
[docs]
def rename(self, source_file_names, destination_file_names):
"""Rename the files at the source list to the destination list.
Source and destination lists should be of the same size.
Args:
source_file_names: List of file paths that need to be moved
destination_file_names: List of destination_file_names for the files
Raises:
``BeamIOError``: if any of the rename operations fail
"""
err_msg = (
"source_file_names and destination_file_names should "
"be equal in length")
assert len(source_file_names) == len(destination_file_names), err_msg
def _rename_file(source, destination):
"""Rename a single file object"""
try:
os.rename(source, destination)
except OSError as err:
raise IOError(err)
exceptions = {}
for source, destination in zip(source_file_names, destination_file_names):
try:
_rename_file(source, destination)
except Exception as e: # pylint: disable=broad-except
exceptions[(source, destination)] = e
if exceptions:
raise BeamIOError("Rename operation failed", exceptions)
[docs]
def exists(self, path):
"""Check if the provided path exists on the FileSystem.
Args:
path: string path that needs to be checked.
Returns: boolean flag indicating if path exists
"""
return os.path.exists(path)
[docs]
def size(self, path):
"""Get size of path on the FileSystem.
Args:
path: string path in question.
Returns: int size of path according to the FileSystem.
Raises:
``BeamIOError``: if path doesn't exist.
"""
try:
return os.path.getsize(path)
except Exception as e: # pylint: disable=broad-except
raise BeamIOError("Size operation failed", {path: e})
[docs]
def last_updated(self, path):
"""Get UNIX Epoch time in seconds on the FileSystem.
Args:
path: string path of file.
Returns: float UNIX Epoch time
Raises:
``BeamIOError``: if path doesn't exist.
"""
if not self.exists(path):
raise BeamIOError('Path does not exist: %s' % path)
return os.path.getmtime(path)
[docs]
def checksum(self, path):
"""Fetch checksum metadata of a file on the
:class:`~apache_beam.io.filesystem.FileSystem`.
Args:
path: string path of a file.
Returns: string containing file size.
Raises:
``BeamIOError``: if path isn't a file or doesn't exist.
"""
if not self.exists(path):
raise BeamIOError('Path does not exist: %s' % path)
return str(os.path.getsize(path))
[docs]
def delete(self, paths):
"""Deletes files or directories at the provided paths.
Directories will be deleted recursively.
Args:
paths: list of paths that give the file objects to be deleted
Raises:
``BeamIOError``: if any of the delete operations fail
"""
def _delete_path(path):
"""Recursively delete the file or directory at the provided path.
"""
try:
if os.path.isdir(path):
shutil.rmtree(path)
else:
os.remove(path)
except OSError as err:
raise IOError(err)
exceptions = {}
def try_delete(path):
try:
_delete_path(path)
except Exception as e: # pylint: disable=broad-except
exceptions[path] = e
for match_result in self.match(paths):
metadata_list = match_result.metadata_list
if not metadata_list:
exceptions[match_result.pattern] = \
IOError('No files found to delete under: %s' % match_result.pattern)
for metadata in match_result.metadata_list:
try_delete(metadata.path)
if exceptions:
raise BeamIOError("Delete operation failed", exceptions)