aws-parquet#

PyPI version shields.io PyPI license PyPI pyversions Downloads image1

aws-parquet is a toolkit than enables working with parquet datasets on AWS. It handles AWS S3 reads/writes, AWS Glue catalog updates and AWS Athena queries by providing a simple and intuitive interface.

Motivation#

The goal is to provide a simple and intuitive interface to create and manage parquet datasets on AWS.

aws-parquet makes use of the following tools: - awswrangler as an AWS SDK for pandas - pandera for pandas-based data validation - typeguard and pydantic for runtime type checking

Features#

aws-parquet provides a ParquetDataset class that enables the following operations:

  • create a parquet dataset that will get registered in AWS Glue

  • append new data to the dataset and update the AWS Glue catalog

  • read a partition of the dataset and perform proper schema validation and type casting

  • overwrite data in the dataset after performing proper schema validation and type casting

  • delete a partition of the dataset and update the AWS Glue catalog

  • query the dataset using AWS Athena

How to setup#

Using pip:

pip install aws_parquet

How to use#

Create a parquet dataset that will get registered in AWS Glue

import os

from aws_parquet import ParquetDataset
import pandas as pd
import pandera as pa
from pandera.typing import Series

# define your pandera schema model
class MyDatasetSchemaModel(pa.SchemaModel):
    col1: Series[int] = pa.Field(nullable=False, ge=0, lt=10)
    col2: Series[pa.DateTime]
    col3: Series[float]

# configuration
database = "default"
bucket_name = os.environ["AWS_S3_BUCKET"]
table_name = "foo_bar"
path = f"s3://{bucket_name}/{table_name}/"
partition_cols = ["col1", "col2"]
schema = MyDatasetSchemaModel.to_schema()

# create the dataset
dataset = ParquetDataset(
    database=database,
    table=table_name,
    partition_cols=partition_cols,
    path=path,
    pandera_schema=schema,
)

dataset.create()

Append new data to the dataset

df = pd.DataFrame({
    "col1": [1, 2, 3],
    "col2": ["2021-01-01", "2021-01-02", "2021-01-03"],
    "col3": [1.0, 2.0, 3.0]
})

dataset.update(df)

Read a partition of the dataset

df = dataset.read({"col2": "2021-01-01"})

Overwrite data in the dataset

df_overwrite = pd.DataFrame({
    "col1": [1, 2, 3],
    "col2": ["2021-01-01", "2021-01-02", "2021-01-03"],
    "col3": [4.0, 5.0, 6.0]
})
dataset.update(df_overwrite, overwrite=True)

Query the dataset using AWS Athena

df = dataset.query("SELECT col1 FROM foo_bar")

Delete a partition of the dataset

dataset.delete({"col1": 1, "col2": "2021-01-01"})

Delete the dataset in its entirety

dataset.delete()

Indices and tables#