Interprocess messaging / communication (IPC)¶
Encapsulated message format¶
Data components in the stream and file formats are represented as encapsulated messages consisting of:
- A length prefix indicating the metadata size
- The message metadata as a Flatbuffer
- Padding bytes to an 8-byte boundary
- The message body, which must be a multiple of 8 bytes
Schematically, we have:
<metadata_size: int32>
<metadata_flatbuffer: bytes>
<padding>
<message body>
The complete serialized message must be a multiple of 8 bytes so that messages can be relocated between streams. Otherwise the amount of padding between the metadata and the message body could be non-deterministic.
The metadata_size
includes the size of the flatbuffer plus padding. The
Message
flatbuffer includes a version number, the particular message (as a
flatbuffer union), and the size of the message body:
table Message {
version: org.apache.arrow.flatbuf.MetadataVersion;
header: MessageHeader;
bodyLength: long;
}
Currently, we support 4 types of messages:
- Schema
- RecordBatch
- DictionaryBatch
- Tensor
Streaming format¶
We provide a streaming format for record batches. It is presented as a sequence
of encapsulated messages, each of which follows the format above. The schema
comes first in the stream, and it is the same for all of the record batches
that follow. If any fields in the schema are dictionary-encoded, one or more
DictionaryBatch
messages will be included. DictionaryBatch
and
RecordBatch
messages may be interleaved, but before any dictionary key is used
in a RecordBatch
it should be defined in a DictionaryBatch
.
<SCHEMA>
<DICTIONARY 0>
...
<DICTIONARY k - 1>
<RECORD BATCH 0>
...
<DICTIONARY x DELTA>
...
<DICTIONARY y DELTA>
...
<RECORD BATCH n - 1>
<EOS [optional]: int32>
When a stream reader implementation is reading a stream, after each message, it may read the next 4 bytes to know how large the message metadata that follows is. Once the message flatbuffer is read, you can then read the message body.
The stream writer can signal end-of-stream (EOS) either by writing a 0 length
as an int32
or simply closing the stream interface.
File format¶
We define a “file format” supporting random access in a very similar format to
the streaming format. The file starts and ends with a magic string ARROW1
(plus padding). What follows in the file is identical to the stream format. At
the end of the file, we write a footer containing a redundant copy of the
schema (which is a part of the streaming format) plus memory offsets and sizes
for each of the data blocks in the file. This enables random access any record
batch in the file. See File.fbs
for the precise details of the file
footer.
Schematically we have:
<magic number "ARROW1">
<empty padding bytes [to 8 byte boundary]>
<STREAMING FORMAT>
<FOOTER>
<FOOTER SIZE: int32>
<magic number "ARROW1">
In the file format, there is no requirement that dictionary keys should be
defined in a DictionaryBatch
before they are used in a RecordBatch
, as long
as the keys are defined somewhere in the file.
RecordBatch body structure¶
The RecordBatch
metadata contains a depth-first (pre-order) flattened set of
field metadata and physical memory buffers (some comments from Message.fbs
have been shortened / removed):
table RecordBatch {
length: long;
nodes: [FieldNode];
buffers: [Buffer];
}
struct FieldNode {
length: long;
null_count: long;
}
struct Buffer {
/// The relative offset into the shared memory page where the bytes for this
/// buffer starts
offset: long;
/// The absolute length (in bytes) of the memory buffer. The memory is found
/// from offset (inclusive) to offset + length (non-inclusive).
length: long;
}
In the context of a file, the page
is not used, and the Buffer
offsets use
as a frame of reference the start of the message body. So, while in a general
IPC setting these offsets may be anyplace in one or more shared memory regions,
in the file format the offsets start from 0.
The location of a record batch and the size of the metadata block as well as the body of buffers is stored in the file footer:
struct Block {
offset: long;
metaDataLength: int;
bodyLength: long;
}
The metaDataLength
here includes the metadata length prefix, serialized
metadata, and any additional padding bytes, and by construction must be a
multiple of 8 bytes.
Some notes about this
- The
Block
offset indicates the starting byte of the record batch. - The metadata length includes the flatbuffer size, the record batch metadata flatbuffer, and any padding bytes
Dictionary Batches¶
Dictionaries are written in the stream and file formats as a sequence of record batches, each having a single field. The complete semantic schema for a sequence of record batches, therefore, consists of the schema along with all of the dictionaries. The dictionary types are found in the schema, so it is necessary to read the schema to first determine the dictionary types so that the dictionaries can be properly interpreted.
table DictionaryBatch {
id: long;
data: RecordBatch;
isDelta: boolean = false;
}
The dictionary id
in the message metadata can be referenced one or more times
in the schema, so that dictionaries can even be used for multiple fields. See
the Physical memory layout document for more about the semantics of
dictionary-encoded data.
The dictionary isDelta
flag allows dictionary batches to be modified
mid-stream. A dictionary batch with isDelta
set indicates that its vector
should be concatenated with those of any previous batches with the same id
. A
stream which encodes one column, the list of strings
["A", "B", "C", "B", "D", "C", "E", "A"]
, with a delta dictionary batch could
take the form:
<SCHEMA>
<DICTIONARY 0>
(0) "A"
(1) "B"
(2) "C"
<RECORD BATCH 0>
0
1
2
1
<DICTIONARY 0 DELTA>
(3) "D"
(4) "E"
<RECORD BATCH 1>
3
2
4
0
EOS
Tensor (Multi-dimensional Array) Message Format¶
The Tensor
message types provides a way to write a multidimensional array of
fixed-size values (such as a NumPy ndarray) using Arrow’s shared memory
tools. Arrow implementations in general are not required to implement this data
format, though we provide a reference implementation in C++.
When writing a standalone encapsulated tensor message, we use the format as indicated above, but additionally align the starting offset of the metadata as well as the starting offset of the tensor body (if writing to a shared memory region) to be multiples of 64 bytes:
<PADDING>
<metadata size: int32>
<metadata>
<tensor body>
SparseTensor Message Format¶
The SparseTensor
message types provides another way to write a
multidimensional array of fixed-size values using Arrow’s shared memory tools
in addition to Tensor
. SparseTensor
is designed specifically for tensors
whose elements are almost zeros. Arrow implementations in general are not
required to implement this data format likewise Tensor
.
When writing a standalone encapsulated sparse tensor message, we use the format as indicated above, but additionally align the starting offset of the metadata as well as the starting offsets of the sparse index and the sparse tensor body (if writing to a shared memory region) to be multiples of 64 bytes:
<PADDING> <metadata size: int32> <metadata> <sparse index> <PADDING> <sparse tensor body>
The contents of the sparse tensor index is depends on what kinds of sparse format is used.