Anant Cassandra.Realtime
License: No License Provided
Language: Scala
This project is part of the Event Driven Toolkit for Cassandra, Spark, Kafka initiative from Anant where we build step-by-step and distributed message processing architecture.
✨ This is episode 3
Description and Link | Tools |
---|---|
1. Reminders on Episode 1, start Cassandra API | Node, Python,Astra |
2. Start and Setup Apache Kafka™ | Api, Kafka |
3. Write into Cassandra | Astra, Kafka |
4. Run Apache Spark Jobs against DataStax Astra | Astra, Spark, Kafka |
This work has been realized during first workshop. The procedure is described step-by-step in the following README.
For reference, recording of first episode is available on youtube
ℹ️ Informations : During this session we implemented the API both in NodeJS (express) and Python (Flask) pick the one you like most for today. We recommend naming your db table leaves
in order to keep it simple when following this demo, but you can use a different tablename, as long as you change the tablename throughout the rest of the demo to use the same table.
Gitpod is an IDE 100% online based on Eclipse Theia. To initialize your environment simply click on the button below (CTRL + Click to open in new tab)
Expected Output
To allow best connectivity make sure your REST API's port 8000 is exposed, so that we can send requests to it later:
ℹ️ Informations : If you don't use this gitpod workspace frequently enough, it will timeout and spin down. If this happens, you can just reopen the workspace and restart the server (using npm start
for NodeJS or python3 app.py
for Python).
When we will tell Kafka Consumer where to send events we will need the public URL for the API.
gp url 8000
Expected Output
This is what you have running as of now:
As before, initialize your environment by simply click on the button below (CTRL + Click to open in new tab). This will open a second gitpod workspaces. They will communicate to each other.
💡 ProTip : To view README in preview mode from Gitpod, right click on the file and select Open With > Preview
:
⚠️ By default Autosave is not enabled in Gitpod. Don't forget to save your modifications with CTRL+S
Make sure Kafka services are up by running confluent local start
. Note that you don't need to start kafka connect yet (and indeed, it won't work until we set it up later on in this demo), but the others should be up.
confluent local status
# if some are not up yet (running again doesn't hurt anything, so you can just run this either way):
confluent local start
Expected Output
ℹ️ Informations : that the specific command you use in the Confluent CLI depends on the version of CLI you are using. Newer versions of the CLI will require you to use confluent local services start
. In gitpod, we downloaded v.1.6.0 for you, so you can use the shorter syntax: confluent local <cmd>
.
If you are in gitpod, we set $CONFLUENT_HOME
for you. It points to where your confluent binary directory is (/home/gitpod/lib/confluent-5.5.1
). If you are not running this in gitpod, you will have to set $CONFLUENT_HOME
yourself.
record-cassandra-leaves-avro
$CONFLUENT_HOME/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic record-cassandra-leaves-avro
Expected Output
record-cassandra-leaves-avro
now exist$CONFLUENT_HOME/bin/kafka-topics --list --zookeeper localhost:2181
Expected Output
Make sure your python environment has requests
and other modules installed.
cd $PROJECT_HOME
pip install -r python/requirements.txt
If you are in gitpod, we set $PROJECT_HOME
for you. It is an absolute path to where this directory is inside this repo (/workspace/cassandra.realtime
). If you are not running this in gitpod, you will have to set $PROJECT_HOME
yourself.
Expected Output
python ./kafka/create-schema.py http://localhost:8081 record-cassandra-leaves ./kafka/leaves-record-schema.avsc
Expected Output
curl http://127.0.0.1:8081/subjects
# should return: ["record-cassandra-leaves-value"]
java -Dmicronaut.config.files=$PROJECT_HOME/kafka/akhq/gitpod-akhq-config.yml -jar ${BINARY_DIR}/akhq.jar
You can see the AKHQ GUI at http://127.0.0.1:8080/
. If you are using gitpod, we exposed 8080
for you by default. You can double check by clicking down here.
Expected Output
💡 ProTip : Use this single-line command to open a preview for port 8080 in gitpod:
gp preview $(gp url 8080)
To see the AKHQ Schema registry view specifically:
gp preview $(gp url 8080)/ui/docker-kafka-server/schema
Expected Output
We are now ready to start sending messages to Kafka.
cd $PROJECT_HOME/python
pip install -r requirements.txt
python3 data_importer.py --config-file-path configs/gitpod-config.ini
Expected Output
You can check the topic that has the schema using kafka-avro-console-consumer
:
(🚨🚨🚨 : can potentially have lots of output)
$CONFLUENT_HOME/bin/kafka-avro-console-consumer --topic record-cassandra-leaves-avro --bootstrap-server localhost:9092 --from-beginning --property schema.registry.url=http://localhost:8081
gitpod-project.properties
file with the url of your running cassandra.api instance.You will need to change the api.host
key. It will look something like api.host=https://8000-c0f5dade-a15f-4d23-b52b-468e334d6abb.ws-us02.gitpod.io
. Again you can find it by running the following command in the gitpod instance running cassandra.api: gp url 8000
.
Change the cassandra.keyspace
as well to whatever your keyspace is in Astra.
ℹ️ Note : if you don't do this, the consumer will still run, but will just fail to write to Cassandra, since its current setting isn't stopping on errors.
cd $PROJECT_HOME/kafka-to-cassandra-worker/src/main/resources/
cp gitpod-project.properties.example gitpod-project.properties
vim gitpod-project.properties
#...
cd $PROJECT_HOME
mvn -f ./kafka-to-cassandra-worker/pom.xml clean package
This will install dependencies and package your jar. If you make changes to your gitpod-project.properties
file, make sure to run mvn clean package again
, using -f
flag to point to the pom.xml
file.
There should now be two jars in ./kafka-to-cassandra-worker/target
, one with-dependencies, one without. We'll use the one with dependencies:
cd $PROJECT_HOME
mvn -f ./kafka-to-cassandra-worker/pom.xml exec:java -Dexec.mainClass="org.anant.KafkaAvroConsumer" -Dexec.args="kafka-to-cassandra-worker/target/classes/gitpod-project.properties"
Note: if your Cassandra.api gitpod workspace timed out, you might need to reopen it and restart the REST API server. Offset is at
latest
, so you won't see anything unless you have messages actively coming in.
cd $PROJECT_HOME/python
python data_importer.py --config-file-path configs/gitpod-config.ini
/ui/docker-kafka-server/topic
.gp preview $(gp url 8080)/ui/docker-kafka-server/topic
(If AKHQ was already on that page, make sure to refresh the view). You should see our consumer group (send-to-cassandra-api-consumer
) listed as a consumer on topic record-cassandra-leaves-avro
:
Expected Output
curl http://localhost:8082/topics/
curl http://localhost:8082/topics/record-cassandra-leaves-avro
cd $PROJECT_HOME/python
python3 data_importer.py --config-file-path configs/gitpod-rest-proxy-config.ini
There should now be new messages for you to consume in your Kafka topic.
Expected output
You can use the Kafka processor API if you want to send messages to Cassandra using the REST API we are using.
cd $PROJECT_HOME
mvn -f ./kafka-to-cassandra-worker/pom.xml exec:java -Dexec.mainClass="org.anant.KafkaStreamsAvroConsumer" -Dexec.args="kafka-to-cassandra-worker/target/classes/gitpod-project.properties"
Make sure to keep sending messages in another terminal or nothing will happen. You can use the same command as before:
cd $PROJECT_HOME/python
python3 data_importer.py --config-file-path configs/gitpod-rest-proxy-config.ini
We used the Processor API to show what it would look like to write to Cassandra using Kafka Streams and a REST API, but it is generally recommended to use Kafka Connect. We will be using the Datastax connector, but there is also a Confluence Cassandra connector as well as other third party connectors available if you are interested.
The Datastax Kafka connector also has instructions and a download link from the Datastax website as well as Confluent Hub.
We provide a connect-standalone.properties.example
that is setup to run kafka-connect-cassandra-sink-1.4.0.jar
. However, you will need to change:
(topic.record-cassandra-leaves-avro.<my_ks>.leaves.mapping)
Fields that require changing are marked by ### TODO make sure to change!
in the example file.
connect-standalone.properties.example
cd $PROJECT_HOME/kafka/connect
cp connect-standalone.properties.gitpod-example connect-standalone.properties
vim connect-standalone.properties
# ...
The worker properties file we provide (found at $PROJECT_HOME/kafka/connect/worker-properties/gitpod-avro-worker.properties
) should work fine without modification in gitpod. However, if you are not using gitpod, you will need to change /workspace/cassandra.realtime
in the plugin path if you are not using gitpod, to whatever your $PROJECT_HOME is.
REMINDER create you Astra Account here
If you have not already, make sure that your Datastax astra secure connect bundle is downloaded.
Display the summary screen and locate the connect
button.
On the connect screen pick drivers
Finally click the download secure bundle button to download the zip of right-click to the button to get the url
If you copied the link....
cd $PROJECT_HOME/kafka/connect/astra.credentials/
curl -L "<YOU_LINK>" > secure-connect-<database-name-in-astra>.zip
if you have the zip, upload file to gitpod with menu or drag and drop it into $PROJECT_HOME/kafka/connect/astra.credentials/
mv ./path/to/astra.credentials/secure-connect-<database-name-in-astra>.zip $PROJECT_HOME/kafka/connect/astra.credentials/
Start Kafka connect using your connect-standalone.properties
file. First you will have to stop the service that the confluent cli started.
confluent local stop connect
$CONFLUENT_HOME/bin/connect-standalone $PROJECT_HOME/kafka/connect/worker-properties/gitpod-avro-worker.properties $PROJECT_HOME/kafka/connect/connect-standalone.properties
Expected output
cd $PROJECT_HOME/python
python3 data_importer.py --config-file-path configs/gitpod-config.ini
If you're not sure if it's working or not, before sending messages to Kafka using the data_importer.py, in the astra console you can delete records previously created using:
TRUNCATE <your_ks>.leaves;
Then send messages, and run a count
SELECT COUNT(*) FROM <your_ks>.leaves;
curl -L -s https://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz | tar xvz -C $PROJECT_HOME/spark
curl -L -s https://github.com/sbt/sbt/releases/download/v1.4.3/sbt-1.4.3.tgz | tar xvz -C $PROJECT_HOME/spark
✅ Drag-and-drop a Copy of Your Secure Connect Bundle into the /spark
directory
✅ Create 2 tables in DataStax Astra
For Astra Studio
CREATE TABLE leaves_by_tag (
tag text,
title text,
tags list<text>,
url text,
PRIMARY KEY ((tag), title)
);
CREATE TABLE tags (
tag text,
count int,
PRIMARY KEY (tag)
);
For CQLSH
CREATE TABLE <your-keyspace>.leaves_by_tag (
tag text,
title text,
tags list<text>,
url text,
PRIMARY KEY ((tag), title)
);
CREATE TABLE <your-keyspace>.tags (
tag text,
count int,
PRIMARY KEY (tag)
);
cd $PROJECT_HOME/spark/spark-3.0.1-bin-hadoop2.7/
./sbin/start-master.sh
./sbin/start-slave.sh <master-url>
💡 ProTip : Use this single-line command to open a preview for port 8080 in gitpod to get the Spark master URL:
gp preview $(gp url 8080)
Important note: May have to open a port slightly above 8080 depending on what is running in your gitpod/local instance (i.e. AKHQ).
Once spark master is started, it should open in a mini window in gitpod and that will show what port to use the gp preview command on, as well as show the spark master url at the top.
Expected Output once master and worker started
spark-cassandra
directorycd $PROJECT_HOME/spark/spark-cassandra/
../sbt/bin/sbt
Expected Output (This may take a minute, but you should see this when done)
properties.conf
fileWe provide a properties.example
file that is setup to run with our Spark jobs. However, you will need to input your own specific configs into the designated spots. Fields that require changing are marked by ### TODO
in the example file. IMPORTANT: Remember to keep 1 whitespace between each parameter and value.
properties.example
filecd $PROJECT_HOME/spark
cp properties.example properties.conf
vim properties.conf
assembly
in sbt server terminal
In the first job, we are going to read the Kafka stream, manipulate the data, and save the data into the leaves_by_tag table we created earlier.
./bin/spark-submit --class sparkCassandra.LeavesByTag --properties-file $PROJECT_HOME/spark/properties.conf --files $PROJECT_HOME/spark/secure-connect-<your-database-name>.zip $PROJECT_HOME/spark/spark-cassandra/target/scala-2.12/spark-cassandra-assembly-0.1.0-SNAPSHOT.jar
Expected Output Once the Job is Watching for the Kafka Stream
$CONFLUENT_HOME/bin/connect-standalone $PROJECT_HOME/kafka/connect/worker-properties/gitpod-avro-worker.properties $PROJECT_HOME/kafka/connect/connect-standalone.properties
Expected output
cd $PROJECT_HOME/python
python3 data_importer.py --config-file-path configs/gitpod-config.ini
✅ Stop Spark Job with CTRL + C
once there is a steady stream of the following in the terminal with no changes:
✅ Check count of rows with the tag of 'spark' in CQLSH or Astra Studio
CQLSH:
select tag, count(*) from <your-keyspace>.leaves_by_tag where tag='spark';
Expected Output
Astra Studio:
select tag, count(*) from leaves_by_tag where tag='spark';
Expected Output
In this job, we are going to take the data we sent via Kafka into the leaves table, transform it with Apache Spark, and write the transformed data into the tags table we created during setup.
./bin/spark-submit --class sparkCassandra.Tags --properties-file $PROJECT_HOME/spark/properties.conf --files $PROJECT_HOME/spark/secure-connect-<your-database-name>.zip $PROJECT_HOME/spark/spark-cassandra/target/scala-2.12/spark-cassandra-assembly-0.1.0-SNAPSHOT.jar
The job will complete on its own, so you do not have to manually end it.
Expected Output
select * from <your-keyspace>.tags where tag='spark';
Expected Output
Astra Studio:
select * from tags where tag='spark';
Expected Output
Join Our Newsletter!
Sign up below to receive email updates and see what's going on with our company.