Methodik

adesso Blog

Writing a serverless ETL pipeline on AWS can be approached from many angles: for example, one could utilize Glue ETL with Step functions or Glue workflows for scheduling, RedShift, Databricks, etc.

However, these options might be a bit too complex for those users who want to transform one table while staying within Athena. The best choice for such tasks is to use an Athena view. However, Athena's views have several limitations:

  • If the query is too complex or has too many stages, it will fail
  • Views cannot be shared through Lake formation filters
  • Max query run time of 30 minutes
  • Etc.

So, in the cases above, looking at Create Table As Select (CTAS) is an instinct, but keeping the destination table fresh (essentially establishing a minimal ETL pipeline) can be pretty tricky.

In this blog post, I present a solution that utilizes Athena's new Iceberg functionality for achieving such minimal ETL pipelines. I consider this a minimal yet functional way of constructing an ETL pipeline for those people who don't have the necessary data engineering knowledge to set up a Glue job, write PySpark, etc.

It also fits nicely with Iceberg's ACID nature, which will support evolving the schema and optimizing the files underneath the table.

Creating the target table

First, we'll need to create the table while specifying its columns and their transformations from the source. Let's assume that target_db is already created:

	
	CREATE TABLE IF NOT EXISTS "target_db". "target_table" WITH (
		  table_type = 'ICEBERG',
		  is_external = false,
		  location = 's3://my-bucket/iceberg-data/'
		) AS
		SELECT
		  "id",
		  "my_column_1",
		  "my_column_2",
		  CAST("lastmodified" AS timestamp(6)) AS "lastmodified" 
		  -- Athena/Iceberg doesn't work with the default timestamp(3), 
		  -- see: https://github.com/trinodb/trino/issues/6658
		FROM
		  "source_db"."my_table"
		LIMIT
		  0;
	

The first part of the code creates the new table in the target_db database, which will be empty for demonstration purposes. The table is created with "ICEBERG" and is not an external table. For some reason, you must explicitly specify this flag. Otherwise, it will complain about the location.

In this case, the table's location must also be specified as an S3 bucket called "my-bucket" and the data will be stored in a folder called "iceberg-data". The table has four columns: id, my_column_1, my_column_2, and lastmodified. The lastmodified column is cast as a timestamp with six decimal places because the default Athena timestamp with three decimal places is incompatible with Iceberg.

Feel free to replace this simple select with any SQL query you like!

Updating the table

Once the table has been created, you can sync (insert/update/delete) rows using the snippet below:

	
	MERGE INTO "target_db"."target_table" "new" USING "source_db"."source_table" "source" ON (
		  ("new"."id" = "source"."id")
		  and (
		    "new"."lastmodified" = "source"."lastmodified"
		  )
		)
		WHEN NOT MATCHED THEN
		INSERT
		  (
		    "id",
		    "my_column_1",
		    "my_column_2",
		    "lastmodified"
		  )
		VALUES
		  (
		    "source"." id",
		    "source". "my_column_1",
		    "source". "my_column_2",
		    (
		      CAST("source"."lastmodified" AS timestamp(6))
		    )
		  )
	

The second part of the code merges data from the source_table into the target_table. The merge is performed based on the id and lastmodified columns. If a row with the same id and lastmodified values already exists in the target_table, the merge will not insert a new row. If a row with the same id and a different lastmodified value exists in the target_table, the merge will update the lastmodified value in the existing row.

This command needs to be run on a schedule to keep the target table in sync. That could be easily achieved by scheduling Step functions. Also, don't forget OPTIMIZE.

Summary

In this blog post, we looked at using Athena's new Iceberg functionality to create a minimal SQL-based ETL pipeline on AWS. It presents a solution for those who don't have the necessary data engineering knowledge to set up a Glue job, write PySpark, etc.

The solution replaces Athena views with Create Table As Select (CTAS) to create a target table and then uses the MERGE INTO command to sync (insert/update/delete) rows between the target table and a source table. The MERGE INTO statement could be run on a schedule, achieving a minimal ETL pipeline.

Picture Attila Papp

Author Attila Papp

Attila Papp works as a Solution Architect for adesso.


Our blog posts at a glance

Our tech blog invites you to dive deep into the exciting dimensions of technology. Here we offer you insights not only into our vision and expertise, but also into the latest trends, developments and ideas shaping the tech world.

Our blog is your platform for inspiring stories, informative articles and practical insights. Whether you are a tech lover, an entrepreneur looking for innovative solutions or just curious - we have something for everyone.

To the blog posts

Save this page. Remove this page.