Source code for pyarrow.ipc

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# Arrow file and stream reader/writer classes, and other messaging tools

import pyarrow as pa

from pyarrow.lib import (Message, MessageReader,  # noqa
                         read_message, read_record_batch, read_schema,
                         read_tensor, write_tensor,
                         get_record_batch_size, get_tensor_size)
import pyarrow.lib as lib


class _ReadPandasOption(object):

    def read_pandas(self, **options):
        """
        Read contents of stream and convert to pandas.DataFrame using
        Table.to_pandas

        Parameters
        ----------
        **options : arguments to forward to Table.to_pandas

        Returns
        -------
        df : pandas.DataFrame
        """
        table = self.read_all()
        return table.to_pandas(**options)


[docs]class RecordBatchStreamReader(lib._RecordBatchStreamReader, _ReadPandasOption): """ Reader for the Arrow streaming binary format Parameters ---------- source : bytes/buffer-like, pyarrow.NativeFile, or file-like Python object Either an in-memory buffer, or a readable file object """
[docs] def __init__(self, source): self._open(source)
[docs]class RecordBatchStreamWriter(lib._RecordBatchStreamWriter): """ Writer for the Arrow streaming binary format Parameters ---------- sink : str, pyarrow.NativeFile, or file-like Python object Either a file path, or a writable file object schema : pyarrow.Schema The Arrow schema for data to be written to the file """
[docs] def __init__(self, sink, schema): self._open(sink, schema)
[docs]class RecordBatchFileReader(lib._RecordBatchFileReader, _ReadPandasOption): """ Class for reading Arrow record batch data from the Arrow binary file format Parameters ---------- source : bytes/buffer-like, pyarrow.NativeFile, or file-like Python object Either an in-memory buffer, or a readable file object footer_offset : int, default None If the file is embedded in some larger file, this is the byte offset to the very end of the file data """
[docs] def __init__(self, source, footer_offset=None): self._open(source, footer_offset=footer_offset)
[docs]class RecordBatchFileWriter(lib._RecordBatchFileWriter): """ Writer to create the Arrow binary file format Parameters ---------- sink : str, pyarrow.NativeFile, or file-like Python object Either a file path, or a writable file object schema : pyarrow.Schema The Arrow schema for data to be written to the file """
[docs] def __init__(self, sink, schema): self._open(sink, schema)
def open_stream(source): """ Create reader for Arrow streaming format Parameters ---------- source : bytes/buffer-like, pyarrow.NativeFile, or file-like Python object Either an in-memory buffer, or a readable file object footer_offset : int, default None If the file is embedded in some larger file, this is the byte offset to the very end of the file data Returns ------- reader : RecordBatchStreamReader """ return RecordBatchStreamReader(source) def open_file(source, footer_offset=None): """ Create reader for Arrow file format Parameters ---------- source : bytes/buffer-like, pyarrow.NativeFile, or file-like Python object Either an in-memory buffer, or a readable file object footer_offset : int, default None If the file is embedded in some larger file, this is the byte offset to the very end of the file data Returns ------- reader : RecordBatchFileReader """ return RecordBatchFileReader(source, footer_offset=footer_offset) def serialize_pandas(df, nthreads=None, preserve_index=True): """Serialize a pandas DataFrame into a buffer protocol compatible object. Parameters ---------- df : pandas.DataFrame nthreads : int, default None Number of threads to use for conversion to Arrow, default all CPUs preserve_index : boolean, default True If True, preserve the pandas index data, otherwise the result will have a default RangeIndex Returns ------- buf : buffer An object compatible with the buffer protocol """ batch = pa.RecordBatch.from_pandas(df, nthreads=nthreads, preserve_index=preserve_index) sink = pa.BufferOutputStream() writer = pa.RecordBatchStreamWriter(sink, batch.schema) writer.write_batch(batch) writer.close() return sink.getvalue() def deserialize_pandas(buf, use_threads=True): """Deserialize a buffer protocol compatible object into a pandas DataFrame. Parameters ---------- buf : buffer An object compatible with the buffer protocol use_threads: boolean, default True Whether to parallelize the conversion using multiple threads Returns ------- df : pandas.DataFrame """ buffer_reader = pa.BufferReader(buf) reader = pa.RecordBatchStreamReader(buffer_reader) table = reader.read_all() return table.to_pandas(use_threads=use_threads)