S3 abstraction on airflow

Why a top level s3 abstraction is what you need

While there are many useful s3 operators, users are still tasked to manage those s3 urls and make sure they are correctly templated in the correct format going to each operator. The more airflow is being developed, the more concepts we need to inject into the s3 url, therefore requiring the user to write out the correct path structure and writing things like

prefix = "my-data/{{env}}/{{ds}}/file.csv"

url = f"s3://my-bucket/{prefix}"

to make sure path can be used in different api, this will then evolve to a large file with hundreds lines of s3 path that is not readable and hard to reuse.

A simple s3 api to the rescue

This problem is solved when we introduced an s3 api where it abstracts way all the complex string building logic and give users a robust object-oriented api instead. At the core, it has a simple mission - bridging the airflow concepts (operator/sensor/templating) with s3 concepts (prefix, keys, url, connection) so we can build standardized s3 usage from airflow.

class S3Data:

bucket: str

prefix: str

env_template: Optional[str] = get_environment()

date_key: str = "dt"

date_value: Optional[str] = "{{ ds }}"

and users and simply write

my_data = S3Data(bucket="my-bucket", prefix="my-data", default_filename="file.csv")

spark = SparkOperator(args={"input": my_data.url()})

This also allows the infrastructure team to define the best path structure and patterns and embed them as default in their s3 ecosystem.

Evolution

Once such api is defined and adopted, it can evolve to support many use case like cross team collaboration and data governance.

For instance, we have integrated our at-rest data governance framework into this s3 abstraction, so that the retention and data compliance is configured in airflow and exposed as `data.retention_tasks()` and `data.scrub_tasks()`.