Anant example-cassandra-etl-with-airflow-and-spark
License: No License Provided
Language: Python
In this walkthrough, we will cover how we can use Airflow to trigger Spark ETL jobs that move date into and within Cassandra. This demo will be relatively simple; however, it can be expanded upon with the addition of other technologies like Kafka, setting scheduling on the Spark jobs to make it a concurrent process, or in general creating more complex Cassandra ETL pipelines. We will focus on showing you how to connect Airflow, Spark, and Cassandra, and in our case today, specifically DataStax Astra. The reason we are using DataStax Astra is because we want everyone to be able to do this demo without having to worry about OS incompatibilities and the sort. For that reason, we will also be using Gitpod
For this walkthrough, we will use 2 Spark jobs. The first Spark job will load 100k rows from a CSV and then write it into a Cassandra table. The second Spark job will read the data from the prior Cassandra table, do some transformations, and then write the transformed data into a different Cassandra table. We also used PySpark to reduce the number of steps to get this working. If we used Scala, we would be required to build the JAR's and that would require more time. If you are interested in seeing how to use the Airflow Spark Submit Operator and run Scala Spark jobs, check out this walkthrough!
If you have not already opened this in gitpod, then CTRL + Click
the button below and get started!
Note: Gitpod will start with two terminals side-by-side. Always use the first one (labeled "all-commands"), except when specified otherwise.
ASTRA DB
is the simplest way to run Cassandra with zero operations at all - just push the button and get your cluster. No credit card required, 40M read/write operations and about 80GB storage monthly for free - sufficient to run small production workloads. If you use up your credits the databases will pause, no charge, and you will be given the option to upgrade to a higher tier.
Leveraging Database creation guide create a database. Right-Click the following button with Open in a new TAB.
Field | Value |
---|---|
Database Name | workshops |
Keyspace Name | airflowdemo |
Regions | Select GOOGLE CLOUD , then an Area close to you, then a region with no LOCK đ icons: the LOCKed regions are the region not accessible to the Free Tier. |
âšī¸ Note: If you already have a database
workshops
, simply add a keyspaceairflowdemo
using theAdd Keyspace
button on the bottom right hand corner of the DB Dashboard page. You may have to "Resume" the database first in case it is in "hibernated" state.
While the database is being created, you will also get a Security token (needed to authenticate with your database and start using it): please IGNORE THIS ONE, as we will be soon creating a new, more powerful token for today.
The status will change from Pending
to Active
when the database is ready, this usually only takes 2-3 minutes.
Note: this step is very important, as the token generated automatically for you with the database lacks some permissions we'll use in the workshop.
Create a token for your app, using the "Database Administrator" role. Keep it handy for later use (best to download it in CSV format, as the values will not be visible afterward). This will provide authentication later when interacting with the database. Keep in mind that all three strings will be needed today (Client ID, Client Secret, Token).
â ī¸ Important
The instructor will show the token creation on screen, but will then destroy it immediately for security reasons.
Astra CLI is a command-line utility to interact with Astra in several ways. As you'll be using it a few times in the following, first run the installation in :
curl -Ls "https://dtsx.io/get-astra-cli" | bash
Run the setup, providing the "Token" (the AstraCS:...
string) when prompted:
source ~/.bashrc # required on any terminal created before installation
astra setup
As a test, you can run
astra db list
or even the DB creation command (which will be a no-op if you created the DB already)
astra db create workshops -k airflowdemo --if-not-exist --wait
You are good to go - to find out more about Astra CLI, have a look at Awesome-Astra.
Use the CLI to download the Secure Connect Bundle to access the DB:
astra db download-scb workshops -f secure-connect-workshops.zip
On your Astra DB Dashboard:
workshops
database and click on it;secure-connect-workshops.zip
if needed and checking it is in the repo's root folder.Use the CLI to launch a small script to create a couple of tables in your database:
astra db cqlsh workshops -f setup.cql
On your Astra DB Dashboard:
workshops
database and click on it;setup.cql
in the Gitpod editor and paste its contents to the CQL Web console.We will be using the quick start script that Airflow provides here. You will be asked to configure a password for the "admin" user, which will be needed later to access Airflow's Web interface. Do not forget what you are entering!
Note: Run this command on the second Gitpod terminal (labeled "run-airflow"), as this will not return control to your prompt. You can switch the active terminal, if needed, through the switcher on the lower-right panel of Gitpod.
# Run on the "run-airflow" console!
bash setup.sh
(Get back to the "all-commands" console.) Start the Spark Master with:
# Run on the "all-commands" console!
./spark-3.0.1-bin-hadoop2.7/sbin/start-master.sh
Open port 8081 in the browser (you can do so by running gp preview --external `gp url 8081`
and checking your popup blocker),
copy the master URL, and paste in the designated spot below
./spark-3.0.1-bin-hadoop2.7/sbin/start-slave.sh <master-URL>
Refresh the Spark Master UI (the page on port 8081 you just opened). Check that a "Worker" has now appeared in the list.
mkdir ~/airflow/dags
mv spark_dag.py ~/airflow/dags
gp open properties.conf
Paste your master URL and, in the username/password fields, the "Client ID" and "Client Secret" found in your Database Administrator token.
example_cassandra_etl
exists.To open the address, you can run gp preview --external `gp url 8080`
(and then check your popup blocker).
The credentials are admin
and the password you chose when you ran the setup.sh
script.
If it does not exist yet, give it a few seconds to refresh.
example_cassandra_etl
, and drill down by clicking on example_cassandra_etl
as shown below.Admin
section of the menu, select Connections
, then spark_default
and update the host from the default (yarn
) to the Spark master URL found earlier as shown below. Save once done.DAG
menu item and return to the dashboard. Unpause the example_cassandra_etl
, and then click on the example_cassandra_etl
link.previous_employees_by_job_title
Run a SELECT statement on the database table to check it has been populated.
You can run it directly in your console with:
astra db cqlsh workshops -e "select * from airflowdemo.previous_employees_by_job_title where job_title='Dentist' limit 20;"
On your Astra DB Dashboard:
workshops
database and click on it;select * from airflowdemo.previous_employees_by_job_title where job_title='Dentist' limit 20;
days_worked_by_previous_employees_by_job_title
Similarly as above,
astra db cqlsh workshops -e "select * from airflowdemo.days_worked_by_previous_employees_by_job_title where job_title='Dentist' limit 20;"
On your Astra DB Dashboard:
workshops
database and click on it;select * from airflowdemo.days_worked_by_previous_employees_by_job_title where job_title='Dentist' limit 20;
And that will wrap up our walkthrough. Again, this is a introduction on how to set up a basic Cassandra ETL process run by Airflow and Spark. As mentioned above, these baby steps can be used to further expand and create more complex and scheduled / repeated Cassandra ETL processes run by Airflow and Spark.
Join Our Newsletter!
Sign up below to receive email updates and see what's going on with our company.