Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 99 additions & 16 deletions docs/content/pypaimon/python-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -687,22 +687,105 @@ Row kind values:

## Data Types

| Python Native Type | PyArrow Type | Paimon Type |
|:--------------------|:-------------------------------------------------|:----------------------------------|
| `int` | `pyarrow.int8()` | `TINYINT` |
| `int` | `pyarrow.int16()` | `SMALLINT` |
| `int` | `pyarrow.int32()` | `INT` |
| `int` | `pyarrow.int64()` | `BIGINT` |
| `float` | `pyarrow.float32()` | `FLOAT` |
| `float` | `pyarrow.float64()` | `DOUBLE` |
| `bool` | `pyarrow.bool_()` | `BOOLEAN` |
| `str` | `pyarrow.string()` | `STRING`, `CHAR(n)`, `VARCHAR(n)` |
| `bytes` | `pyarrow.binary()` | `BYTES`, `VARBINARY(n)` |
| `bytes` | `pyarrow.binary(length)` | `BINARY(length)` |
| `decimal.Decimal` | `pyarrow.decimal128(precision, scale)` | `DECIMAL(precision, scale)` |
| `datetime.datetime` | `pyarrow.timestamp(unit, tz=None)` | `TIMESTAMP(p)` |
| `datetime.date` | `pyarrow.date32()` | `DATE` |
| `datetime.time` | `pyarrow.time32(unit)` or `pyarrow.time64(unit)` | `TIME(p)` |
### Scalar Types

| Python Native Type | PyArrow Type | Paimon Type |
|:--------------------|:---------------------------------------|:----------------------------------|
| `int` | `pyarrow.int8()` | `TINYINT` |
| `int` | `pyarrow.int16()` | `SMALLINT` |
| `int` | `pyarrow.int32()` | `INT` |
| `int` | `pyarrow.int64()` | `BIGINT` |
| `float` | `pyarrow.float32()` | `FLOAT` |
| `float` | `pyarrow.float64()` | `DOUBLE` |
| `bool` | `pyarrow.bool_()` | `BOOLEAN` |
| `str` | `pyarrow.string()` | `STRING`, `CHAR(n)`, `VARCHAR(n)` |
| `bytes` | `pyarrow.binary()` | `BYTES`, `VARBINARY(n)` |
| `bytes` | `pyarrow.binary(length)` | `BINARY(length)` |
| `bytes` | `pyarrow.large_binary()` | `BLOB` |
| `decimal.Decimal` | `pyarrow.decimal128(precision, scale)` | `DECIMAL(precision, scale)` |
| `datetime.datetime` | `pyarrow.timestamp('us', tz=None)` | `TIMESTAMP(p)` (p=4..6) |
| `datetime.datetime` | `pyarrow.timestamp('ms', tz=None)` | `TIMESTAMP(p)` (p=1..3) |
| `datetime.datetime` | `pyarrow.timestamp('s', tz=None)` | `TIMESTAMP(p)` (p=0) |
| `datetime.datetime` | `pyarrow.timestamp('ns', tz=None)` | `TIMESTAMP(p)` (p=7..9) |
| `datetime.datetime` | `pyarrow.timestamp('us', tz='UTC')` | `TIMESTAMP_LTZ(p)` (p=4..6) |
| `datetime.date` | `pyarrow.date32()` | `DATE` |
| `datetime.time` | `pyarrow.time32('ms')` | `TIME(p)` |

### Complex Types

| Python Native Type | PyArrow Type | Paimon Type |
|:-------------------|:--------------------------------------|:-----------------------|
| `list` | `pyarrow.list_(element_type)` | `ARRAY<element_type>` |
| `dict` | `pyarrow.map_(key_type, value_type)` | `MAP<key, value>` |
| `dict` | `pyarrow.struct([field, ...])` | `ROW<field ...>` |

### VARIANT Type

`VARIANT` stores semi-structured, schema-flexible data (JSON objects, arrays, and primitives)
in the [Parquet Variant binary encoding](https://github.com/apache/parquet-format/blob/master/VariantEncoding.md).

pypaimon provides `GenericVariant` for encoding, decoding, and path extraction:

```python
from pypaimon.data.generic_variant import GenericVariant
```

**Reading a VARIANT column:**

```python
read_builder = table.new_read_builder()
result = read_builder.new_read().to_arrow(read_builder.new_scan().plan().splits())

for record in result.to_pylist():
if (payload := record["payload"]) is not None:
gv = GenericVariant.from_arrow_struct(payload)
print(gv.to_python()) # decode to Python object
print(gv.variant_get("$.city", "string")) # path extraction
```

**Writing a VARIANT column:**

```python
import pyarrow as pa
from pypaimon.data.generic_variant import GenericVariant

gv1 = GenericVariant.from_json('{"city": "Beijing", "age": 30}')
gv2 = GenericVariant.from_json('[1, 2, 3]')
gv3 = GenericVariant.from_json('null')

data = pa.table({
"id": pa.array([1, 2, 3], type=pa.int32()),
"payload": GenericVariant.to_arrow_array([gv1, gv2, gv3]),
})

write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
table_write.write_arrow(data)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()
```

**`GenericVariant` API:**

| Method | Description |
|:-------|:------------|
| `GenericVariant.from_json(json_str)` | Build from a JSON string |
| `GenericVariant.from_python(obj)` | Build from a Python object (`dict`, `list`, `int`, `str`, …) |
| `GenericVariant.from_arrow_struct({"value": b"...", "metadata": b"..."})` | Wrap raw bytes from an Arrow VARIANT struct row (read path) |
| `GenericVariant.to_arrow_array([gv1, gv2, None, ...])` | Convert a list of `GenericVariant` (or `None`) to a `pa.StructArray` for writing |
| `gv.to_python()` | Decode to native Python (`dict`, `list`, `int`, `str`, `None`, …) |
| `gv.to_json()` | Decode to a JSON string |
| `gv.variant_get(path, cast_type=None)` | Extract a value by JSONPath (e.g. `"$.address.city"`, `"$.tags[0]"`); optional `cast_type`: `"string"`, `"int"`, `"long"`, `"double"`, `"boolean"`, `"decimal"` |
| `gv.get_type()` | Return the `Type` enum of the root value |

**Limitations:**

- `VARIANT` is only supported with Parquet file format. Writing to ORC or Avro raises `NotImplementedError`.
- `VARIANT` cannot be used as a primary key or partition key.
- Shredded VARIANT files (written by Paimon Java with `typed_value` sub-fields) are readable
via the raw `from_arrow_struct` path, but the extra fields are not automatically interpreted.

## Predicate

Expand Down
87 changes: 87 additions & 0 deletions paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.data.DataFormatTestUtil;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.variant.GenericVariant;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.Path;
Expand Down Expand Up @@ -941,6 +942,92 @@ protected GenericRow createRow3ColsWithKind(RowKind rowKind, Object... values) {
return GenericRow.ofKind(rowKind, values[0], values[1], values[2]);
}

/** Java writes a VARIANT-column table for Python to read (Java→Python E2E). */
@Test
@EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
public void testJavaWriteVariantTable() throws Exception {
Identifier identifier = identifier("variant_test");
catalog.dropTable(identifier, true);
Schema schema =
Schema.newBuilder()
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.column("payload", DataTypes.VARIANT())
.option("bucket", "-1")
.build();
catalog.createTable(identifier, schema, false);

FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
try (BatchTableWrite write = writeBuilder.newWrite();
BatchTableCommit commit = writeBuilder.newCommit()) {
write.write(
GenericRow.of(
1,
BinaryString.fromString("Alice"),
GenericVariant.fromJson("{\"age\":30,\"city\":\"Beijing\"}")));
write.write(
GenericRow.of(
2,
BinaryString.fromString("Bob"),
GenericVariant.fromJson("{\"age\":25,\"city\":\"Shanghai\"}")));
write.write(
GenericRow.of(
3,
BinaryString.fromString("Carol"),
GenericVariant.fromJson("[1,2,3]")));
commit.commit(write.prepareCommit());
}

// Verify Java can read back what it wrote
FileStoreTable readTable = (FileStoreTable) catalog.getTable(identifier);
List<Split> splits = new ArrayList<>(readTable.newSnapshotReader().read().dataSplits());
TableRead read = readTable.newRead();
List<String> res =
getResult(read, splits, row -> internalRowToString(row, readTable.rowType()));
assertThat(res).hasSize(3);
LOG.info("testJavaWriteVariantTable: wrote and read back {} VARIANT rows", res.size());
}

/** Java reads a VARIANT-column table written by Python (Python→Java E2E). */
@Test
@EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
public void testJavaReadVariantTable() throws Exception {
Identifier identifier = identifier("py_variant_test");
FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
List<Split> splits = new ArrayList<>(table.newSnapshotReader().read().dataSplits());
TableRead read = table.newRead();
List<String> res =
getResult(read, splits, row -> internalRowToString(row, table.rowType()));
assertThat(res).hasSize(4);

// Verify the VARIANT column is present in the schema
assertThat(table.rowType().getFieldNames()).contains("payload");
assertThat(table.rowType().getTypeAt(table.rowType().getFieldIndex("payload")))
.isEqualTo(DataTypes.VARIANT());

// Verify each row's VARIANT payload can be decoded by Java
List<Split> splits2 = new ArrayList<>(table.newSnapshotReader().read().dataSplits());
try (org.apache.paimon.reader.RecordReader<InternalRow> reader =
read.createReader(splits2)) {
reader.forEachRemaining(
row -> {
int id = row.getInt(0);
if (id == 4) {
// null payload
assertThat(row.isNullAt(2)).isTrue();
} else {
assertThat(row.isNullAt(2)).isFalse();
org.apache.paimon.data.variant.Variant v = row.getVariant(2);
assertThat(v).isNotNull();
}
});
}
LOG.info(
"testJavaReadVariantTable: Java read {} VARIANT rows written by Python",
res.size());
}

/** Step 1: Write 5 base files for compact conflict test. */
@Test
@EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
Expand Down
79 changes: 78 additions & 1 deletion paimon-python/dev/run_mixed_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,55 @@ run_blob_alter_compact_test() {
fi
}

# Function to run VARIANT test (Java write, Python read)
run_java_variant_write_py_read_test() {
echo -e "${YELLOW}=== Running VARIANT Test (Java Write, Python Read) ===${NC}"

cd "$PROJECT_ROOT"

echo "Running Maven test for JavaPyE2ETest.testJavaWriteVariantTable..."
if mvn test -Dtest=org.apache.paimon.JavaPyE2ETest#testJavaWriteVariantTable -pl paimon-core -q -Drun.e2e.tests=true; then
echo -e "${GREEN}✓ Java VARIANT write test completed successfully${NC}"
else
echo -e "${RED}✗ Java VARIANT write test failed${NC}"
return 1
fi
cd "$PAIMON_PYTHON_DIR"
echo "Running Python test for JavaPyReadWriteTest.test_py_read_variant_table..."
if python -m pytest java_py_read_write_test.py::JavaPyReadWriteTest::test_py_read_variant_table -v; then
echo -e "${GREEN}✓ Python VARIANT read test completed successfully${NC}"
return 0
else
echo -e "${RED}✗ Python VARIANT read test failed${NC}"
return 1
fi
}

# Function to run VARIANT test (Python write, Java read)
run_py_variant_write_java_read_test() {
echo -e "${YELLOW}=== Running VARIANT Test (Python Write, Java Read) ===${NC}"

cd "$PAIMON_PYTHON_DIR"
echo "Running Python test for JavaPyReadWriteTest.test_py_write_variant_table..."
if ! python -m pytest java_py_read_write_test.py::JavaPyReadWriteTest::test_py_write_variant_table -v; then
echo -e "${RED}✗ Python VARIANT write test failed${NC}"
return 1
fi
echo -e "${GREEN}✓ Python VARIANT write test completed successfully${NC}"

echo ""

cd "$PROJECT_ROOT"
echo "Running Maven test for JavaPyE2ETest.testJavaReadVariantTable..."
if mvn test -Dtest=org.apache.paimon.JavaPyE2ETest#testJavaReadVariantTable -pl paimon-core -q -Drun.e2e.tests=true; then
echo -e "${GREEN}✓ Java VARIANT read test completed successfully${NC}"
return 0
else
echo -e "${RED}✗ Java VARIANT read test failed${NC}"
return 1
fi
}

# Main execution
main() {
local java_write_result=0
Expand All @@ -352,6 +401,8 @@ main() {
local lumina_vector_result=0
local compact_conflict_result=0
local blob_alter_compact_result=0
local java_variant_write_py_read_result=0
local py_variant_write_java_read_result=0

# Detect Python version
PYTHON_VERSION=$(python -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')" 2>/dev/null || echo "unknown")
Expand Down Expand Up @@ -448,6 +499,20 @@ main() {

echo ""

# Run VARIANT type test (Java write, Python read)
if ! run_java_variant_write_py_read_test; then
java_variant_write_py_read_result=1
fi

echo ""

# Run VARIANT Python-write Java-read test
if ! run_py_variant_write_java_read_test; then
py_variant_write_java_read_result=1
fi

echo ""

echo -e "${YELLOW}=== Test Results Summary ===${NC}"

if [[ $java_write_result -eq 0 ]]; then
Expand Down Expand Up @@ -516,12 +581,24 @@ main() {
echo -e "${RED}✗ Blob Alter+Compact Test (Java Write+Alter+Compact, Python Read): FAILED${NC}"
fi

if [[ $java_variant_write_py_read_result -eq 0 ]]; then
echo -e "${GREEN}✓ VARIANT Type Test (Java Write, Python Read): PASSED${NC}"
else
echo -e "${RED}✗ VARIANT Type Test (Java Write, Python Read): FAILED${NC}"
fi

if [[ $py_variant_write_java_read_result -eq 0 ]]; then
echo -e "${GREEN}✓ VARIANT Type Test (Python Write, Java Read): PASSED${NC}"
else
echo -e "${RED}✗ VARIANT Type Test (Python Write, Java Read): FAILED${NC}"
fi

echo ""

# Clean up warehouse directory after all tests
cleanup_warehouse

if [[ $java_write_result -eq 0 && $python_read_result -eq 0 && $python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 && $btree_index_result -eq 0 && $compressed_text_result -eq 0 && $tantivy_fulltext_result -eq 0 && $lumina_vector_result -eq 0 && $compact_conflict_result -eq 0 && $blob_alter_compact_result -eq 0 ]]; then
if [[ $java_write_result -eq 0 && $python_read_result -eq 0 && $python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 && $btree_index_result -eq 0 && $compressed_text_result -eq 0 && $tantivy_fulltext_result -eq 0 && $lumina_vector_result -eq 0 && $compact_conflict_result -eq 0 && $blob_alter_compact_result -eq 0 && $java_variant_write_py_read_result -eq 0 && $py_variant_write_java_read_result -eq 0 ]]; then
echo -e "${GREEN}🎉 All tests passed! Java-Python interoperability verified.${NC}"
return 0
else
Expand Down
Loading
Loading