Airavat is a metric interceptor and a job watchdog for Spark Applications. It also features an interactive UI which shows all Spark Applications running, jobs and SQL Queries along with their metrics.
- ✔️ Super-fast integration, just add the jar and you're good to go
- ✔️ Monitor Jobs, Queries and Executions
- ✔️ Collect Job Metrics such as Disk Spill, Shuffle Read Data, Result Size etc.
- ✔️ Collect Query Plans and view them on the UI
- ✔️ Set Limits / Thresholds for maximum shuffle / maximum result size / duration of a job.
- ✔️ Kill potentially disruptive / resource hogging jobs before they impact the overall health and stability of your application.
- ✔️ Trace which jobs belong to which queries and have this information persisted for analysis even after your application shuts down.
- ✔️ Aggregate Data across all spark applications
- ✔️ Works with all spark application modes - local, yarn-client, cluster etc.
- ✔️ (Coming Soon) - Predict run times / duration for your spark queries!
- Spark 2.3.0 +
- Scala / sbt to compile and build the jar
- ReactJS, Node/NPM and dependencies for the frontend/UI
- FastAPI, SQLAlchemy for the server.
git clone https://github.com/alivcor/airavat.git
sbt clean package publishLocal
To test run, download the Netflix Data CSV file from Kaggle and place it under data/
directory.
sbt run
Linux/MacOS
--conf "spark.extraListeners=com.iresium.airavat.AiravatJobListener"
--conf "spark.airavat.collectJobMetrics=true"
--conf "spark.airavat.collectQueryMetrics=true"
--conf "spark.airavat.collectQueryPlan=true"
--conf "spark.airavat.collectJobMetrics=true"
--jars /path/to/airavat-0.1.jar
Scala Application
val spark = SparkSession
.builder()
.master("local")
.appName("My Spark Application")
.config("spark.extraListeners", "com.iresium.airavat.AiravatJobListener")
.config("spark.airavat.collectJobMetrics", "true")
.config("spark.airavat.collectQueryMetrics", "true")
.config("spark.airavat.collectQueryPlan", "true")
.getOrCreate()
cd server
pip install -r requirements.txt
uvicorn main:app
You can also run it with nohup as a daemon process nohup main:app >> airavat_server.log &
cd ui
npm install
npm start
1. Change the `sparkVersion` to desired spark version
2. Build the sbt package again.
3. Make sure to update the jars.
Currently, Airavat supports following limits:
- spark.airavat.maxTotalDuration
- spark.airavat.maxTotalTasks
- spark.airavat.maxTotalDiskSpill
- spark.airavat.maxTotalBytesRead
- spark.airavat.maxTotalBytesWritten
- spark.airavat.maxTotalResultSize
- spark.airavat.maxTotalShuffleReadBytes
- spark.airavat.maxTotalShuffleWriteBytes
- spark.airavat.maxJobDuration
Airavat uses Slick - Functional Relational Mapping for scala which can be configured to be used with different Databases.
Out of the box Airavat uses SQLite - (my favorite and the simplest DB in the world!). However, this can be changed based on resource and capacity requirements. By default, airavat creates and uses airavat_db
as Database identifier which is picked from application.conf
(under src/main/resources/
in the source code), you can add a db configuration there, and then set spark.airavat.dbName
if your identifier is anything other than airavat_db
airavat_db = {
driver = "org.sqlite.JDBC",
url = "jdbc:sqlite:/path/to/airavat.db",
connectionPool = disabled
keepAliveConnection = true
}
The backend server however uses SQLAlchemy to establish database connections. Please make sure to point your backend server to also point to the same database as your Spark Application!
The backend server uses a config file config/server.conf
. Configure the SQLAlchemy URL to your database in the config file.
[database]
url = sqlite:////path/to/airavat.db
DynamoDB can be used as a sink for Airavat. Set the SQLAlchemy URL as
[database]
url = amazondynamodb:///?Access Key=xxx&Secret Key=xxx&Domain=amazonaws.com&Region=OREGON
Optionally, you can add support to use PynamoDB which provides a richer interface
The setting spark.airavat.dbTimeoutSeconds
dictates the timeout for Airavat to persist the metrics/data to the DB on a Job End event. All actions are best-effort and never guaranteed. Timeout is set to 120 seconds by default.
Airavat retains several maps in memory which hold job and query info. Most of the elements are evicted from the map automatically. However, if jobs are not attached to a SQL, or are not evicted because of a failure, airavat periodically evicts the maps to avoid creating a huge memory profile. The setting spark.airavat.maxJobRetain
dictates how many jobs airavat retains in memory at any given moment.
Looking for contributors ! You are welcome to raise issues / send a pull-request.
I haven't been able to add comments to the code and a lot of the code here has been hastily written. Suggestions are welcome and pull requests are highly encouraged.
Link to ScalaDoc/JavaDoc can be found here
- Abhinandan Dubey - @alivcor
This project is licensed under the MIT License - see the LICENSE.md file for details