# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from collections import defaultdict
from concurrent import futures
from functools import partial
from six.moves.urllib.parse import urlparse
import json
import numpy as np
import os
import re
import six
import pyarrow as pa
import pyarrow.lib as lib
import pyarrow._parquet as _parquet
from pyarrow._parquet import (ParquetReader, RowGroupStatistics, # noqa
FileMetaData, RowGroupMetaData,
ColumnChunkMetaData,
ParquetSchema, ColumnSchema)
from pyarrow.compat import guid
from pyarrow.filesystem import (LocalFileSystem, _ensure_filesystem,
resolve_filesystem_and_path)
from pyarrow.util import _is_path_like, _stringify_path
_URI_STRIP_SCHEMES = ('hdfs',)
def _parse_uri(path):
path = _stringify_path(path)
parsed_uri = urlparse(path)
if parsed_uri.scheme in _URI_STRIP_SCHEMES:
return parsed_uri.path
else:
# ARROW-4073: On Windows returning the path with the scheme
# stripped removes the drive letter, if any
return path
def _get_filesystem_and_path(passed_filesystem, path):
if passed_filesystem is None:
return resolve_filesystem_and_path(path, passed_filesystem)
else:
passed_filesystem = _ensure_filesystem(passed_filesystem)
parsed_path = _parse_uri(path)
return passed_filesystem, parsed_path
def _check_contains_null(val):
if isinstance(val, six.binary_type):
for byte in val:
if isinstance(byte, six.binary_type):
compare_to = chr(0)
else:
compare_to = 0
if byte == compare_to:
return True
elif isinstance(val, six.text_type):
return u'\x00' in val
return False
def _check_filters(filters):
"""
Check if filters are well-formed.
"""
if filters is not None:
if len(filters) == 0 or any(len(f) == 0 for f in filters):
raise ValueError("Malformed filters")
if isinstance(filters[0][0], six.string_types):
# We have encountered the situation where we have one nesting level
# too few:
# We have [(,,), ..] instead of [[(,,), ..]]
filters = [filters]
for conjunction in filters:
for col, op, val in conjunction:
if (
isinstance(val, list)
and all(_check_contains_null(v) for v in val)
or _check_contains_null(val)
):
raise NotImplementedError(
"Null-terminated binary strings are not supported as"
" filter values."
)
return filters
# ----------------------------------------------------------------------
# Reading a single Parquet file
[docs]class ParquetFile(object):
"""
Reader interface for a single Parquet file
Parameters
----------
source : str, pathlib.Path, pyarrow.NativeFile, or file-like object
Readable source. For passing bytes or buffer-like file containing a
Parquet file, use pyarorw.BufferReader
metadata : ParquetFileMetadata, default None
Use existing metadata object, rather than reading from file.
common_metadata : ParquetFileMetadata, default None
Will be used in reads for pandas schema metadata if not found in the
main file's metadata, no other uses at the moment
memory_map : boolean, default True
If the source is a file path, use a memory map to read file, which can
improve performance in some environments
"""
[docs] def __init__(self, source, metadata=None, common_metadata=None,
memory_map=True):
self.reader = ParquetReader()
self.reader.open(source, use_memory_map=memory_map, metadata=metadata)
self.common_metadata = common_metadata
self._nested_paths_by_prefix = self._build_nested_paths()
def _build_nested_paths(self):
paths = self.reader.column_paths
result = defaultdict(list)
def _visit_piece(i, key, rest):
result[key].append(i)
if len(rest) > 0:
nested_key = '.'.join((key, rest[0]))
_visit_piece(i, nested_key, rest[1:])
for i, path in enumerate(paths):
_visit_piece(i, path[0], path[1:])
return result
@property
def metadata(self):
return self.reader.metadata
@property
def schema(self):
return self.metadata.schema
@property
def num_row_groups(self):
return self.reader.num_row_groups
[docs] def read_row_group(self, i, columns=None, use_threads=True,
use_pandas_metadata=False):
"""
Read a single row group from a Parquet file
Parameters
----------
columns: list
If not None, only these columns will be read from the row group. A
column name may be a prefix of a nested field, e.g. 'a' will select
'a.b', 'a.c', and 'a.d.e'
use_threads : boolean, default True
Perform multi-threaded column reads
use_pandas_metadata : boolean, default False
If True and file has custom pandas schema metadata, ensure that
index columns are also loaded
Returns
-------
pyarrow.table.Table
Content of the row group as a table (of columns)
"""
column_indices = self._get_column_indices(
columns, use_pandas_metadata=use_pandas_metadata)
return self.reader.read_row_group(i, column_indices=column_indices,
use_threads=use_threads)
[docs] def read(self, columns=None, use_threads=True, use_pandas_metadata=False):
"""
Read a Table from Parquet format
Parameters
----------
columns: list
If not None, only these columns will be read from the file. A
column name may be a prefix of a nested field, e.g. 'a' will select
'a.b', 'a.c', and 'a.d.e'
use_threads : boolean, default True
Perform multi-threaded column reads
use_pandas_metadata : boolean, default False
If True and file has custom pandas schema metadata, ensure that
index columns are also loaded
Returns
-------
pyarrow.table.Table
Content of the file as a table (of columns)
"""
column_indices = self._get_column_indices(
columns, use_pandas_metadata=use_pandas_metadata)
return self.reader.read_all(column_indices=column_indices,
use_threads=use_threads)
[docs] def scan_contents(self, columns=None, batch_size=65536):
"""
Read contents of file with a single thread for indicated columns and
batch size. Number of rows in file is returned. This function is used
for benchmarking
Parameters
----------
columns : list of integers, default None
If None, scan all columns
batch_size : int, default 64K
Number of rows to read at a time internally
Returns
-------
num_rows : number of rows in file
"""
column_indices = self._get_column_indices(columns)
return self.reader.scan_contents(column_indices,
batch_size=batch_size)
def _get_column_indices(self, column_names, use_pandas_metadata=False):
if column_names is None:
return None
indices = []
for name in column_names:
if name in self._nested_paths_by_prefix:
indices.extend(self._nested_paths_by_prefix[name])
if use_pandas_metadata:
file_keyvalues = self.metadata.metadata
common_keyvalues = (self.common_metadata.metadata
if self.common_metadata is not None
else None)
if file_keyvalues and b'pandas' in file_keyvalues:
index_columns = _get_pandas_index_columns(file_keyvalues)
elif common_keyvalues and b'pandas' in common_keyvalues:
index_columns = _get_pandas_index_columns(common_keyvalues)
else:
index_columns = []
if indices is not None and index_columns:
indices += [self.reader.column_name_idx(descr['field_name'])
for descr in index_columns
if descr['kind'] == 'serialized']
return indices
_SPARK_DISALLOWED_CHARS = re.compile('[ ,;{}()\n\t=]')
def _sanitized_spark_field_name(name):
return _SPARK_DISALLOWED_CHARS.sub('_', name)
def _sanitize_schema(schema, flavor):
if 'spark' in flavor:
sanitized_fields = []
schema_changed = False
for field in schema:
name = field.name
sanitized_name = _sanitized_spark_field_name(name)
if sanitized_name != name:
schema_changed = True
sanitized_field = pa.field(sanitized_name, field.type,
field.nullable, field.metadata)
sanitized_fields.append(sanitized_field)
else:
sanitized_fields.append(field)
new_schema = pa.schema(sanitized_fields, metadata=schema.metadata)
return new_schema, schema_changed
else:
return schema, False
def _sanitize_table(table, new_schema, flavor):
# TODO: This will not handle prohibited characters in nested field names
if 'spark' in flavor:
column_data = [table[i].data for i in range(table.num_columns)]
return pa.Table.from_arrays(column_data, schema=new_schema)
else:
return table
_parquet_writer_arg_docs = """version : {"1.0", "2.0"}, default "1.0"
The Parquet format version, defaults to 1.0
use_dictionary : bool or list
Specify if we should use dictionary encoding in general or only for
some columns.
use_deprecated_int96_timestamps : boolean, default None
Write timestamps to INT96 Parquet format. Defaults to False unless enabled
by flavor argument. This take priority over the coerce_timestamps option.
coerce_timestamps : string, default None
Cast timestamps a particular resolution.
Valid values: {None, 'ms', 'us'}
allow_truncated_timestamps : boolean, default False
Allow loss of data when coercing timestamps to a particular
resolution. E.g. if microsecond or nanosecond data is lost when coercing to
'ms', do not raise an exception
compression : str or dict
Specify the compression codec, either on a general basis or per-column.
Valid values: {'NONE', 'SNAPPY', 'GZIP', 'LZO', 'BROTLI', 'LZ4', 'ZSTD'}
flavor : {'spark'}, default None
Sanitize schema or set other compatibility options for compatibility
filesystem : FileSystem, default None
If nothing passed, will be inferred from `where` if path-like, else
`where` is already a file-like object so no filesystem is needed."""
[docs]class ParquetWriter(object):
__doc__ = """
Class for incrementally building a Parquet file for Arrow tables
Parameters
----------
where : path or file-like object
schema : arrow Schema
{0}
""".format(_parquet_writer_arg_docs)
[docs] def __init__(self, where, schema, filesystem=None,
flavor=None,
version='1.0',
use_dictionary=True,
compression='snappy',
use_deprecated_int96_timestamps=None, **options):
if use_deprecated_int96_timestamps is None:
# Use int96 timestamps for Spark
if flavor is not None and 'spark' in flavor:
use_deprecated_int96_timestamps = True
else:
use_deprecated_int96_timestamps = False
self.flavor = flavor
if flavor is not None:
schema, self.schema_changed = _sanitize_schema(schema, flavor)
else:
self.schema_changed = False
self.schema = schema
self.where = where
# If we open a file using a filesystem, store file handle so we can be
# sure to close it when `self.close` is called.
self.file_handle = None
filesystem, path = resolve_filesystem_and_path(where, filesystem)
if filesystem is not None:
sink = self.file_handle = filesystem.open(path, 'wb')
else:
sink = where
self.writer = _parquet.ParquetWriter(
sink, schema,
version=version,
compression=compression,
use_dictionary=use_dictionary,
use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
**options)
self.is_open = True
def __del__(self):
if getattr(self, 'is_open', False):
self.close()
def __enter__(self):
return self
def __exit__(self, *args, **kwargs):
self.close()
# return false since we want to propagate exceptions
return False
[docs] def write_table(self, table, row_group_size=None):
if self.schema_changed:
table = _sanitize_table(table, self.schema, self.flavor)
assert self.is_open
if not table.schema.equals(self.schema, check_metadata=False):
msg = ('Table schema does not match schema used to create file: '
'\ntable:\n{0!s} vs. \nfile:\n{1!s}'.format(table.schema,
self.schema))
raise ValueError(msg)
self.writer.write_table(table, row_group_size=row_group_size)
[docs] def close(self):
if self.is_open:
self.writer.close()
self.is_open = False
if self.file_handle is not None:
self.file_handle.close()
def _get_pandas_index_columns(keyvalues):
return (json.loads(keyvalues[b'pandas'].decode('utf8'))
['index_columns'])
# ----------------------------------------------------------------------
# Metadata container providing instructions about reading a single Parquet
# file, possibly part of a partitioned dataset
class ParquetDatasetPiece(object):
"""
A single chunk of a potentially larger Parquet dataset to read. The
arguments will indicate to read either a single row group or all row
groups, and whether to add partition keys to the resulting pyarrow.Table
Parameters
----------
path : str or pathlib.Path
Path to file in the file system where this piece is located
open_file_func : callable
Function to use for obtaining file handle to dataset piece
partition_keys : list of tuples
[(column name, ordinal index)]
row_group : int, default None
Row group to load. By default, reads all row groups
"""
def __init__(self, path, open_file_func=partial(open, mode='rb'),
row_group=None, partition_keys=None):
self.path = _stringify_path(path)
self.open_file_func = open_file_func
self.row_group = row_group
self.partition_keys = partition_keys or []
def __eq__(self, other):
if not isinstance(other, ParquetDatasetPiece):
return False
return (self.path == other.path and
self.row_group == other.row_group and
self.partition_keys == other.partition_keys)
def __ne__(self, other):
return not (self == other)
def __repr__(self):
return ('{0}({1!r}, row_group={2!r}, partition_keys={3!r})'
.format(type(self).__name__, self.path,
self.row_group,
self.partition_keys))
def __str__(self):
result = ''
if len(self.partition_keys) > 0:
partition_str = ', '.join('{0}={1}'.format(name, index)
for name, index in self.partition_keys)
result += 'partition[{0}] '.format(partition_str)
result += self.path
if self.row_group is not None:
result += ' | row_group={0}'.format(self.row_group)
return result
def get_metadata(self):
"""
Given a function that can create an open ParquetFile object, return the
file's metadata
"""
return self.open().metadata
def open(self):
"""
Returns instance of ParquetFile
"""
reader = self.open_file_func(self.path)
if not isinstance(reader, ParquetFile):
reader = ParquetFile(reader)
return reader
def read(self, columns=None, use_threads=True, partitions=None,
file=None, use_pandas_metadata=False):
"""
Read this piece as a pyarrow.Table
Parameters
----------
columns : list of column names, default None
use_threads : boolean, default True
Perform multi-threaded column reads
partitions : ParquetPartitions, default None
open_file_func : function, default None
A function that knows how to construct a ParquetFile object given
the file path in this piece
file : file-like object
passed to ParquetFile
Returns
-------
table : pyarrow.Table
"""
if self.open_file_func is not None:
reader = self.open()
elif file is not None:
reader = ParquetFile(file)
else:
# try to read the local path
reader = ParquetFile(self.path)
options = dict(columns=columns,
use_threads=use_threads,
use_pandas_metadata=use_pandas_metadata)
if self.row_group is not None:
table = reader.read_row_group(self.row_group, **options)
else:
table = reader.read(**options)
if len(self.partition_keys) > 0:
if partitions is None:
raise ValueError('Must pass partition sets')
# Here, the index is the categorical code of the partition where
# this piece is located. Suppose we had
#
# /foo=a/0.parq
# /foo=b/0.parq
# /foo=c/0.parq
#
# Then we assign a=0, b=1, c=2. And the resulting Table pieces will
# have a DictionaryArray column named foo having the constant index
# value as indicated. The distinct categories of the partition have
# been computed in the ParquetManifest
for i, (name, index) in enumerate(self.partition_keys):
# The partition code is the same for all values in this piece
indices = np.array([index], dtype='i4').repeat(len(table))
# This is set of all partition values, computed as part of the
# manifest, so ['a', 'b', 'c'] as in our example above.
dictionary = partitions.levels[i].dictionary
arr = lib.DictionaryArray.from_arrays(indices, dictionary)
col = lib.Column.from_array(name, arr)
table = table.append_column(col)
return table
class PartitionSet(object):
"""A data structure for cataloguing the observed Parquet partitions at a
particular level. So if we have
/foo=a/bar=0
/foo=a/bar=1
/foo=a/bar=2
/foo=b/bar=0
/foo=b/bar=1
/foo=b/bar=2
Then we have two partition sets, one for foo, another for bar. As we visit
levels of the partition hierarchy, a PartitionSet tracks the distinct
values and assigns categorical codes to use when reading the pieces
"""
def __init__(self, name, keys=None):
self.name = name
self.keys = keys or []
self.key_indices = {k: i for i, k in enumerate(self.keys)}
self._dictionary = None
def get_index(self, key):
"""
Get the index of the partition value if it is known, otherwise assign
one
"""
if key in self.key_indices:
return self.key_indices[key]
else:
index = len(self.key_indices)
self.keys.append(key)
self.key_indices[key] = index
return index
@property
def dictionary(self):
if self._dictionary is not None:
return self._dictionary
if len(self.keys) == 0:
raise ValueError('No known partition keys')
# Only integer and string partition types are supported right now
try:
integer_keys = [int(x) for x in self.keys]
dictionary = lib.array(integer_keys)
except ValueError:
dictionary = lib.array(self.keys)
self._dictionary = dictionary
return dictionary
@property
def is_sorted(self):
return list(self.keys) == sorted(self.keys)
class ParquetPartitions(object):
def __init__(self):
self.levels = []
self.partition_names = set()
def __len__(self):
return len(self.levels)
def __getitem__(self, i):
return self.levels[i]
def get_index(self, level, name, key):
"""
Record a partition value at a particular level, returning the distinct
code for that value at that level. Example:
partitions.get_index(1, 'foo', 'a') returns 0
partitions.get_index(1, 'foo', 'b') returns 1
partitions.get_index(1, 'foo', 'c') returns 2
partitions.get_index(1, 'foo', 'a') returns 0
Parameters
----------
level : int
The nesting level of the partition we are observing
name : string
The partition name
key : string or int
The partition value
"""
if level == len(self.levels):
if name in self.partition_names:
raise ValueError('{0} was the name of the partition in '
'another level'.format(name))
part_set = PartitionSet(name)
self.levels.append(part_set)
self.partition_names.add(name)
return self.levels[level].get_index(key)
def filter_accepts_partition(self, part_key, filter, level):
p_column, p_value_index = part_key
f_column, op, f_value = filter
if p_column != f_column:
return True
f_type = type(f_value)
if isinstance(f_value, set):
if not f_value:
raise ValueError("Cannot use empty set as filter value")
if op not in {'in', 'not in'}:
raise ValueError("Op '%s' not supported with set value",
op)
if len(set([type(item) for item in f_value])) != 1:
raise ValueError("All elements of set '%s' must be of"
" same type", f_value)
f_type = type(next(iter(f_value)))
p_value = f_type((self.levels[level]
.dictionary[p_value_index]
.as_py()))
if op == "=" or op == "==":
return p_value == f_value
elif op == "!=":
return p_value != f_value
elif op == '<':
return p_value < f_value
elif op == '>':
return p_value > f_value
elif op == '<=':
return p_value <= f_value
elif op == '>=':
return p_value >= f_value
elif op == 'in':
return p_value in f_value
elif op == 'not in':
return p_value not in f_value
else:
raise ValueError("'%s' is not a valid operator in predicates.",
filter[1])
class ParquetManifest(object):
"""
"""
def __init__(self, dirpath, open_file_func=None, filesystem=None,
pathsep='/', partition_scheme='hive', metadata_nthreads=1):
filesystem, dirpath = _get_filesystem_and_path(filesystem, dirpath)
self.filesystem = filesystem
self.open_file_func = open_file_func
self.pathsep = pathsep
self.dirpath = _stringify_path(dirpath)
self.partition_scheme = partition_scheme
self.partitions = ParquetPartitions()
self.pieces = []
self._metadata_nthreads = metadata_nthreads
self._thread_pool = futures.ThreadPoolExecutor(
max_workers=metadata_nthreads)
self.common_metadata_path = None
self.metadata_path = None
self._visit_level(0, self.dirpath, [])
# Due to concurrency, pieces will potentially by out of order if the
# dataset is partitioned so we sort them to yield stable results
self.pieces.sort(key=lambda piece: piece.path)
if self.common_metadata_path is None:
# _common_metadata is a subset of _metadata
self.common_metadata_path = self.metadata_path
self._thread_pool.shutdown()
def _visit_level(self, level, base_path, part_keys):
fs = self.filesystem
_, directories, files = next(fs.walk(base_path))
filtered_files = []
for path in files:
full_path = self.pathsep.join((base_path, path))
if path.endswith('_common_metadata'):
self.common_metadata_path = full_path
elif path.endswith('_metadata'):
self.metadata_path = full_path
elif self._should_silently_exclude(path):
continue
else:
filtered_files.append(full_path)
# ARROW-1079: Filter out "private" directories starting with underscore
filtered_directories = [self.pathsep.join((base_path, x))
for x in directories
if not _is_private_directory(x)]
filtered_files.sort()
filtered_directories.sort()
if len(filtered_files) > 0 and len(filtered_directories) > 0:
raise ValueError('Found files in an intermediate '
'directory: {0}'.format(base_path))
elif len(filtered_directories) > 0:
self._visit_directories(level, filtered_directories, part_keys)
else:
self._push_pieces(filtered_files, part_keys)
def _should_silently_exclude(self, file_name):
return (file_name.endswith('.crc') or # Checksums
file_name.endswith('_$folder$') or # HDFS directories in S3
file_name.startswith('.') or # Hidden files
file_name in EXCLUDED_PARQUET_PATHS)
def _visit_directories(self, level, directories, part_keys):
futures_list = []
for path in directories:
head, tail = _path_split(path, self.pathsep)
name, key = _parse_hive_partition(tail)
index = self.partitions.get_index(level, name, key)
dir_part_keys = part_keys + [(name, index)]
# If you have less threads than levels, the wait call will block
# indefinitely due to multiple waits within a thread.
if level < self._metadata_nthreads:
future = self._thread_pool.submit(self._visit_level,
level + 1,
path,
dir_part_keys)
futures_list.append(future)
else:
self._visit_level(level + 1, path, dir_part_keys)
if futures_list:
futures.wait(futures_list)
def _parse_partition(self, dirname):
if self.partition_scheme == 'hive':
return _parse_hive_partition(dirname)
else:
raise NotImplementedError('partition schema: {0}'
.format(self.partition_scheme))
def _push_pieces(self, files, part_keys):
self.pieces.extend([
ParquetDatasetPiece(path, partition_keys=part_keys,
open_file_func=self.open_file_func)
for path in files
])
def _parse_hive_partition(value):
if '=' not in value:
raise ValueError('Directory name did not appear to be a '
'partition: {0}'.format(value))
return value.split('=', 1)
def _is_private_directory(x):
_, tail = os.path.split(x)
return tail.startswith('_') and '=' not in tail
def _path_split(path, sep):
i = path.rfind(sep) + 1
head, tail = path[:i], path[i:]
head = head.rstrip(sep)
return head, tail
EXCLUDED_PARQUET_PATHS = {'_SUCCESS'}
[docs]class ParquetDataset(object):
"""
Encapsulates details of reading a complete Parquet dataset possibly
consisting of multiple files and partitions in subdirectories
Parameters
----------
path_or_paths : str or List[str]
A directory name, single file name, or list of file names
filesystem : FileSystem, default None
If nothing passed, paths assumed to be found in the local on-disk
filesystem
metadata : pyarrow.parquet.FileMetaData
Use metadata obtained elsewhere to validate file schemas
schema : pyarrow.parquet.Schema
Use schema obtained elsewhere to validate file schemas. Alternative to
metadata parameter
split_row_groups : boolean, default False
Divide files into pieces for each row group in the file
validate_schema : boolean, default True
Check that individual file schemas are all the same / compatible
filters : List[Tuple] or List[List[Tuple]] or None (default)
List of filters to apply, like ``[[('x', '=', 0), ...], ...]``. This
implements partition-level (hive) filtering only, i.e., to prevent the
loading of some files of the dataset.
Predicates are expressed in disjunctive normal form (DNF). This means
that the innermost tuple describe a single column predicate. These
inner predicate make are all combined with a conjunction (AND) into a
larger predicate. The most outer list then combines all filters
with a disjunction (OR). By this, we should be able to express all
kinds of filters that are possible using boolean logic.
This function also supports passing in as List[Tuple]. These predicates
are evaluated as a conjunction. To express OR in predictates, one must
use the (preferred) List[List[Tuple]] notation.
metadata_nthreads: int, default 1
How many threads to allow the thread pool which is used to read the
dataset metadata. Increasing this is helpful to read partitioned
datasets.
memory_map : boolean, default True
If the source is a file path, use a memory map to read each file in the
dataset if possible, which can improve performance in some environments
"""
[docs] def __init__(self, path_or_paths, filesystem=None, schema=None,
metadata=None, split_row_groups=False, validate_schema=True,
filters=None, metadata_nthreads=1, memory_map=True):
a_path = path_or_paths
if isinstance(a_path, list):
a_path = a_path[0]
self.fs, _ = _get_filesystem_and_path(filesystem, a_path)
if isinstance(path_or_paths, list):
self.paths = [_parse_uri(path) for path in path_or_paths]
else:
self.paths = _parse_uri(path_or_paths)
self.memory_map = memory_map
self._open_file_func = self._get_open_file_func()
(self.pieces,
self.partitions,
self.common_metadata_path,
self.metadata_path) = _make_manifest(
path_or_paths, self.fs, metadata_nthreads=metadata_nthreads,
open_file_func=self._open_file_func)
if self.common_metadata_path is not None:
with self.fs.open(self.common_metadata_path) as f:
self.common_metadata = (ParquetFile(f, memory_map=memory_map)
.metadata)
else:
self.common_metadata = None
if metadata is None and self.metadata_path is not None:
with self.fs.open(self.metadata_path) as f:
self.metadata = ParquetFile(f, memory_map=memory_map).metadata
else:
self.metadata = metadata
self.schema = schema
self.split_row_groups = split_row_groups
if split_row_groups:
raise NotImplementedError("split_row_groups not yet implemented")
if validate_schema:
self.validate_schemas()
if filters is not None:
filters = _check_filters(filters)
self._filter(filters)
[docs] def validate_schemas(self):
if self.metadata is None and self.schema is None:
if self.common_metadata is not None:
self.schema = self.common_metadata.schema
else:
self.schema = self.pieces[0].get_metadata().schema
elif self.schema is None:
self.schema = self.metadata.schema
# Verify schemas are all compatible
dataset_schema = self.schema.to_arrow_schema()
# Exclude the partition columns from the schema, they are provided
# by the path, not the DatasetPiece
if self.partitions is not None:
for partition_name in self.partitions.partition_names:
if dataset_schema.get_field_index(partition_name) != -1:
field_idx = dataset_schema.get_field_index(partition_name)
dataset_schema = dataset_schema.remove(field_idx)
for piece in self.pieces:
file_metadata = piece.get_metadata()
file_schema = file_metadata.schema.to_arrow_schema()
if not dataset_schema.equals(file_schema, check_metadata=False):
raise ValueError('Schema in {0!s} was different. \n'
'{1!s}\n\nvs\n\n{2!s}'
.format(piece, file_schema,
dataset_schema))
[docs] def read(self, columns=None, use_threads=True, use_pandas_metadata=False):
"""
Read multiple Parquet files as a single pyarrow.Table
Parameters
----------
columns : List[str]
Names of columns to read from the file
use_threads : boolean, default True
Perform multi-threaded column reads
use_pandas_metadata : bool, default False
Passed through to each dataset piece
Returns
-------
pyarrow.Table
Content of the file as a table (of columns)
"""
tables = []
for piece in self.pieces:
table = piece.read(columns=columns, use_threads=use_threads,
partitions=self.partitions,
use_pandas_metadata=use_pandas_metadata)
tables.append(table)
all_data = lib.concat_tables(tables)
if use_pandas_metadata:
# We need to ensure that this metadata is set in the Table's schema
# so that Table.to_pandas will construct pandas.DataFrame with the
# right index
common_metadata = self._get_common_pandas_metadata()
current_metadata = all_data.schema.metadata or {}
if common_metadata and b'pandas' not in current_metadata:
all_data = all_data.replace_schema_metadata({
b'pandas': common_metadata})
return all_data
[docs] def read_pandas(self, **kwargs):
"""
Read dataset including pandas metadata, if any. Other arguments passed
through to ParquetDataset.read, see docstring for further details
Returns
-------
pyarrow.Table
Content of the file as a table (of columns)
"""
return self.read(use_pandas_metadata=True, **kwargs)
def _get_common_pandas_metadata(self):
if self.common_metadata is None:
return None
keyvalues = self.common_metadata.metadata
return keyvalues.get(b'pandas', None)
def _get_open_file_func(self):
if self.fs is None or isinstance(self.fs, LocalFileSystem):
def open_file(path, meta=None):
return ParquetFile(path, metadata=meta,
memory_map=self.memory_map,
common_metadata=self.common_metadata)
else:
def open_file(path, meta=None):
return ParquetFile(self.fs.open(path, mode='rb'),
memory_map=self.memory_map,
metadata=meta,
common_metadata=self.common_metadata)
return open_file
def _filter(self, filters):
accepts_filter = self.partitions.filter_accepts_partition
def one_filter_accepts(piece, filter):
return all(accepts_filter(part_key, filter, level)
for level, part_key in enumerate(piece.partition_keys))
def all_filters_accept(piece):
return any(all(one_filter_accepts(piece, f) for f in conjunction)
for conjunction in filters)
self.pieces = [p for p in self.pieces if all_filters_accept(p)]
def _make_manifest(path_or_paths, fs, pathsep='/', metadata_nthreads=1,
open_file_func=None):
partitions = None
common_metadata_path = None
metadata_path = None
if isinstance(path_or_paths, list) and len(path_or_paths) == 1:
# Dask passes a directory as a list of length 1
path_or_paths = path_or_paths[0]
if _is_path_like(path_or_paths) and fs.isdir(path_or_paths):
manifest = ParquetManifest(path_or_paths, filesystem=fs,
open_file_func=open_file_func,
pathsep=fs.pathsep,
metadata_nthreads=metadata_nthreads)
common_metadata_path = manifest.common_metadata_path
metadata_path = manifest.metadata_path
pieces = manifest.pieces
partitions = manifest.partitions
else:
if not isinstance(path_or_paths, list):
path_or_paths = [path_or_paths]
# List of paths
if len(path_or_paths) == 0:
raise ValueError('Must pass at least one file path')
pieces = []
for path in path_or_paths:
if not fs.isfile(path):
raise IOError('Passed non-file path: {0}'
.format(path))
piece = ParquetDatasetPiece(path, open_file_func=open_file_func)
pieces.append(piece)
return pieces, partitions, common_metadata_path, metadata_path
_read_table_docstring = """
{0}
Parameters
----------
source: str, pyarrow.NativeFile, or file-like object
If a string passed, can be a single file name or directory name. For
file-like objects, only read a single file. Use pyarrow.BufferReader to
read a file contained in a bytes or buffer-like object
columns: list
If not None, only these columns will be read from the file. A column
name may be a prefix of a nested field, e.g. 'a' will select 'a.b',
'a.c', and 'a.d.e'
use_threads : boolean, default True
Perform multi-threaded column reads
metadata : FileMetaData
If separately computed
memory_map : boolean, default True
If the source is a file path, use a memory map to read file, which can
improve performance in some environments
{1}
Returns
-------
{2}
"""
[docs]def read_table(source, columns=None, use_threads=True, metadata=None,
use_pandas_metadata=False, memory_map=True,
filesystem=None):
if _is_path_like(source):
fs, path = _get_filesystem_and_path(filesystem, source)
return fs.read_parquet(path, columns=columns,
use_threads=use_threads, metadata=metadata,
use_pandas_metadata=use_pandas_metadata)
pf = ParquetFile(source, metadata=metadata)
return pf.read(columns=columns, use_threads=use_threads,
use_pandas_metadata=use_pandas_metadata)
read_table.__doc__ = _read_table_docstring.format(
'Read a Table from Parquet format',
"""use_pandas_metadata : boolean, default False
If True and file has custom pandas schema metadata, ensure that
index columns are also loaded""",
"""pyarrow.Table
Content of the file as a table (of columns)""")
[docs]def read_pandas(source, columns=None, use_threads=True, memory_map=True,
metadata=None):
return read_table(source, columns=columns,
use_threads=use_threads,
metadata=metadata, memory_map=True,
use_pandas_metadata=True)
read_pandas.__doc__ = _read_table_docstring.format(
'Read a Table from Parquet format, also reading DataFrame\n'
'index values if known in the file metadata',
'',
"""pyarrow.Table
Content of the file as a Table of Columns, including DataFrame
indexes as columns""")
[docs]def write_table(table, where, row_group_size=None, version='1.0',
use_dictionary=True, compression='snappy',
use_deprecated_int96_timestamps=None,
coerce_timestamps=None,
allow_truncated_timestamps=False,
flavor=None, filesystem=None, **kwargs):
row_group_size = kwargs.pop('chunk_size', row_group_size)
use_int96 = use_deprecated_int96_timestamps
try:
with ParquetWriter(
where, table.schema,
filesystem=filesystem,
version=version,
flavor=flavor,
use_dictionary=use_dictionary,
coerce_timestamps=coerce_timestamps,
allow_truncated_timestamps=allow_truncated_timestamps,
compression=compression,
use_deprecated_int96_timestamps=use_int96,
**kwargs) as writer:
writer.write_table(table, row_group_size=row_group_size)
except Exception:
if _is_path_like(where):
try:
os.remove(_stringify_path(where))
except os.error:
pass
raise
write_table.__doc__ = """
Write a Table to Parquet format
Parameters
----------
table : pyarrow.Table
where: string or pyarrow.NativeFile
{0}
""".format(_parquet_writer_arg_docs)
def _mkdir_if_not_exists(fs, path):
if fs._isfilestore() and not fs.exists(path):
try:
fs.mkdir(path)
except OSError:
assert fs.exists(path)
[docs]def write_to_dataset(table, root_path, partition_cols=None, filesystem=None,
**kwargs):
"""
Wrapper around parquet.write_table for writing a Table to
Parquet format by partitions.
For each combination of partition columns and values,
a subdirectories are created in the following
manner:
root_dir/
group1=value1
group2=value1
<uuid>.parquet
group2=value2
<uuid>.parquet
group1=valueN
group2=value1
<uuid>.parquet
group2=valueN
<uuid>.parquet
Parameters
----------
table : pyarrow.Table
root_path : string,
The root directory of the dataset
filesystem : FileSystem, default None
If nothing passed, paths assumed to be found in the local on-disk
filesystem
partition_cols : list,
Column names by which to partition the dataset
Columns are partitioned in the order they are given
**kwargs : dict, kwargs for write_table function.
"""
fs, root_path = _get_filesystem_and_path(filesystem, root_path)
_mkdir_if_not_exists(fs, root_path)
if partition_cols is not None and len(partition_cols) > 0:
df = table.to_pandas(ignore_metadata=True)
partition_keys = [df[col] for col in partition_cols]
data_df = df.drop(partition_cols, axis='columns')
data_cols = df.columns.drop(partition_cols)
if len(data_cols) == 0:
raise ValueError('No data left to save outside partition columns')
subschema = table.schema
# ARROW-2891: Ensure the output_schema is preserved when writing a
# partitioned dataset
for col in table.schema.names:
if col in partition_cols:
subschema = subschema.remove(subschema.get_field_index(col))
for keys, subgroup in data_df.groupby(partition_keys):
if not isinstance(keys, tuple):
keys = (keys,)
subdir = '/'.join(
['{colname}={value}'.format(colname=name, value=val)
for name, val in zip(partition_cols, keys)])
subtable = pa.Table.from_pandas(subgroup, preserve_index=False,
schema=subschema, safe=False)
prefix = '/'.join([root_path, subdir])
_mkdir_if_not_exists(fs, prefix)
outfile = guid() + '.parquet'
full_path = '/'.join([prefix, outfile])
with fs.open(full_path, 'wb') as f:
write_table(subtable, f, **kwargs)
else:
outfile = guid() + '.parquet'
full_path = '/'.join([root_path, outfile])
with fs.open(full_path, 'wb') as f:
write_table(table, f, **kwargs)
[docs]def read_schema(where, memory_map=False):
"""
Read effective Arrow schema from Parquet file metadata
Parameters
----------
where : string (filepath) or file-like object
memory_map : boolean, default False
Create memory map when the source is a file path
Returns
-------
schema : pyarrow.Schema
"""
return ParquetFile(where, memory_map=memory_map).schema.to_arrow_schema()