5. October 2023 By Attila Papp
A minimal ETL pipeline with Athena
Writing a serverless ETL pipeline on AWS can be approached from many angles: for example, one could utilize Glue ETL with Step functions or Glue workflows for scheduling, RedShift, Databricks, etc.
However, these options might be a bit too complex for those users who want to transform one table while staying within Athena. The best choice for such tasks is to use an Athena view. However, Athena's views have several limitations:
- If the query is too complex or has too many stages, it will fail
- Views cannot be shared through Lake formation filters
- Max query run time of 30 minutes
So, in the cases above, looking at Create Table As Select (CTAS) is an instinct, but keeping the destination table fresh (essentially establishing a minimal ETL pipeline) can be pretty tricky.
In this blog post, I present a solution that utilizes Athena's new Iceberg functionality for achieving such minimal ETL pipelines. I consider this a minimal yet functional way of constructing an ETL pipeline for those people who don't have the necessary data engineering knowledge to set up a Glue job, write PySpark, etc.
It also fits nicely with Iceberg's ACID nature, which will support evolving the schema and optimizing the files underneath the table.
Creating the target table
First, we'll need to create the table while specifying its columns and their transformations from the source. Let's assume that
target_db is already created:
CREATE TABLE IF NOT EXISTS "target_db". "target_table" WITH ( table_type = 'ICEBERG', is_external = false, location = 's3://my-bucket/iceberg-data/' ) AS SELECT "id", "my_column_1", "my_column_2", CAST("lastmodified" AS timestamp(6)) AS "lastmodified" -- Athena/Iceberg doesn't work with the default timestamp(3), -- see: https://github.com/trinodb/trino/issues/6658 FROM "source_db"."my_table" LIMIT 0;
The first part of the code creates the new table in the
target_db database, which will be empty for demonstration purposes. The table is created with "ICEBERG" and is not an external table. For some reason, you must explicitly specify this flag. Otherwise, it will complain about the location.
In this case, the table's location must also be specified as an S3 bucket called "my-bucket" and the data will be stored in a folder called "iceberg-data". The table has four columns:
id, my_column_1, my_column_2, and
lastmodified. The lastmodified column is cast as a timestamp with six decimal places because the default Athena timestamp with three decimal places is incompatible with Iceberg.
Feel free to replace this simple select with any SQL query you like!
Updating the table
Once the table has been created, you can sync (insert/update/delete) rows using the snippet below:
MERGE INTO "target_db"."target_table" "new" USING "source_db"."source_table" "source" ON ( ("new"."id" = "source"."id") and ( "new"."lastmodified" = "source"."lastmodified" ) ) WHEN NOT MATCHED THEN INSERT ( "id", "my_column_1", "my_column_2", "lastmodified" ) VALUES ( "source"." id", "source". "my_column_1", "source". "my_column_2", ( CAST("source"."lastmodified" AS timestamp(6)) ) )
The second part of the code merges data from the
source_table into the
target_table. The merge is performed based on the
lastmodified columns. If a row with the same
lastmodified values already exists in the
target_table, the merge will not insert a new row. If a row with the same id and a different lastmodified value exists in the target_table, the merge will update the
lastmodified value in the existing row.
This command needs to be run on a schedule to keep the target table in sync. That could be easily achieved by scheduling Step functions. Also, don't forget OPTIMIZE.
In this blog post, we looked at using Athena's new Iceberg functionality to create a minimal SQL-based ETL pipeline on AWS. It presents a solution for those who don't have the necessary data engineering knowledge to set up a Glue job, write PySpark, etc.
The solution replaces Athena views with Create Table As Select (CTAS) to create a target table and then uses the MERGE INTO command to sync (insert/update/delete) rows between the target table and a source table. The MERGE INTO statement could be run on a schedule, achieving a minimal ETL pipeline.