Source code for hipscat.io.file_io.file_pointer

import os
from typing import Any, Dict, List, NewType, Tuple, Union

import fsspec

[docs] FilePointer = NewType("FilePointer", str)
"""Unified type for references to files."""
[docs] def get_file_protocol(pointer: FilePointer) -> str: """Method to parse filepointer for the filesystem protocol. If it doesn't follow the pattern of protocol://pathway/to/file, then it assumes that it is a localfilesystem. Args: pointer: filesystem pathway pointer """ if not isinstance(pointer, str): pointer = str(pointer) protocol = fsspec.utils.get_protocol(pointer) return protocol
[docs] def get_fs( file_pointer: FilePointer, storage_options: Union[Dict[Any, Any], None] = None ) -> Tuple[fsspec.filesystem, FilePointer]: """Create the abstract filesystem Args: file_pointer: filesystem pathway storage_options: dictionary that contains abstract filesystem credentials Raises: ImportError: if environment cannot import necessary libraries for fsspec filesystems. """ if storage_options is None: storage_options = {} protocol = get_file_protocol(file_pointer) file_pointer = get_file_pointer_for_fs(protocol, file_pointer) try: file_system = fsspec.filesystem(protocol, **storage_options) except ImportError as error: raise ImportError from error return file_system, file_pointer
[docs] def get_file_pointer_for_fs(protocol: str, file_pointer: FilePointer) -> FilePointer: """Creates the filepathway from the file_pointer. This will strip the protocol so that the file_pointer can be accessed from the filesystem: - abfs filesystems DO NOT require the account_name in the pathway - s3 filesystems DO require the account_name/container name in the pathway Args: protocol: str filesytem protocol, file, abfs, or s3 file_pointer: filesystem pathway """ if not isinstance(file_pointer, str): file_pointer = str(file_pointer) if "file" in protocol: # return the entire filepath for local files if "file://" in file_pointer: split_pointer = file_pointer.split("file://")[1] else: split_pointer = file_pointer elif protocol == "https": # https should include the protocol in the file path split_pointer = file_pointer # don't split else: split_pointer = file_pointer.split(f"{protocol}://")[1] return FilePointer(split_pointer)
[docs] def get_full_file_pointer(path: str, protocol_path: str) -> FilePointer: """Rebuilds the file_pointer with the protocol and account name if required""" protocol = get_file_protocol(protocol_path) if not path.startswith(protocol): path = f"{protocol}://{path}" return FilePointer(path)
[docs] def get_file_pointer_from_path(path: str, include_protocol: str = None) -> FilePointer: """Returns a file pointer from a path string""" if include_protocol: path = get_full_file_pointer(path, include_protocol) return FilePointer(path)
[docs] def get_basename_from_filepointer(pointer: FilePointer) -> str: """Returns the base name of a regular file. May return empty string if the file is a directory. Args: pointer: `FilePointer` object to find a basename within Returns: string representation of the basename of a file. """ return os.path.basename(pointer)
[docs] def strip_leading_slash_for_pyarrow(pointer: FilePointer, protocol: str) -> FilePointer: """Strips the leading slash for pyarrow read/write functions. This is required for pyarrow's underlying filesystem abstraction. Args: pointer: `FilePointer` object Returns: New file pointer with leading slash removed. """ if "file" not in protocol and str(pointer).startswith("/"): pointer = FilePointer(str(pointer).replace("/", "", 1)) return pointer
[docs] def append_paths_to_pointer(pointer: FilePointer, *paths: str) -> FilePointer: """Append directories and/or a file name to a specified file pointer. Args: pointer: `FilePointer` object to add path to paths: any number of directory names optionally followed by a file name to append to the pointer Returns: New file pointer to path given by joining given pointer and path names """ return FilePointer(os.path.join(pointer, *paths))
[docs] def does_file_or_directory_exist( pointer: FilePointer, storage_options: Union[Dict[Any, Any], None] = None ) -> bool: """Checks if a file or directory exists for a given file pointer Args: pointer: File Pointer to check if file or directory exists at storage_options: dictionary that contains abstract filesystem credentials Returns: True if file or directory at `pointer` exists, False if not """ file_system, pointer = get_fs(pointer, storage_options) return file_system.exists(pointer)
[docs] def is_regular_file(pointer: FilePointer, storage_options: Union[Dict[Any, Any], None] = None) -> bool: """Checks if a regular file (NOT a directory) exists for a given file pointer. Args: pointer: File Pointer to check if a regular file storage_options: dictionary that contains abstract filesystem credentials Returns: True if regular file at `pointer` exists, False if not or is a directory """ file_system, pointer = get_fs(pointer, storage_options) return file_system.isfile(pointer)
[docs] def find_files_matching_path( pointer: FilePointer, *paths: str, include_protocol=False, storage_options: Union[Dict[Any, Any], None] = None, ) -> List[FilePointer]: """Find files or directories matching the provided path parts. Args: pointer: base File Pointer in which to find contents paths: any number of directory names optionally followed by a file name. directory or file names may be replaced with `*` as a matcher. include_protocol: boolean on whether or not to include the filesystem protocol in the returned directory contents storage_options: dictionary that contains abstract filesystem credentials Returns: New file pointers to files found matching the path """ matcher = append_paths_to_pointer(pointer, *paths) file_system, _ = get_fs(pointer, storage_options) contents = [get_file_pointer_from_path(x) for x in file_system.glob(matcher)] if include_protocol: contents = [get_full_file_pointer(x, protocol_path=pointer) for x in contents] return contents
[docs] def directory_has_contents(pointer: FilePointer, storage_options: Union[Dict[Any, Any], None] = None) -> bool: """Checks if a directory already has some contents (any files or subdirectories) Args: pointer: File Pointer to check for existing contents storage_options: dictionary that contains abstract filesystem credentials Returns: True if there are any files or subdirectories below this directory. """ return len(find_files_matching_path(pointer, "*", storage_options=storage_options)) > 0
[docs] def get_directory_contents( pointer: FilePointer, include_protocol=False, storage_options: Union[Dict[Any, Any], None] = None ) -> List[FilePointer]: """Finds all files and directories in the specified directory. NB: This is not recursive, and will return only the first level of directory contents. Args: pointer: File Pointer in which to find contents include_protocol: boolean on whether or not to include the filesystem protocol in the returned directory contents storage_options: dictionary that contains abstract filesystem credentials Returns: New file pointers to files or subdirectories below this directory. """ file_system, file_pointer = get_fs(pointer, storage_options) contents = file_system.listdir(file_pointer) contents = [FilePointer(x["name"]) for x in contents] if len(contents) == 0: return [] contents.sort() if include_protocol: contents = [get_full_file_pointer(x, protocol_path=pointer) for x in contents] return contents