- How to (actually) Configure a Kafka Cluster in KRaft Mode For Testing & Development
- The Apache Kafka Control Plane
Modify the .env file with the credentials and global configs:
HIVE_VERSION=3.1.3
HIVE_METASTORE_DATABASE_USER=hive_metastore
HIVE_METASTORE_DATABASE_PASSWORD=hive-password
MINIO_PORT=9001
MINIO_ROOT_USER=miniouser
MINIO_ROOT_PASSWORD=miniopassword
MINIO_BUCKET=prin
S3_PRE_SIGNED_URL_EXPIRATION_SECONDS=3600
SUPERSET_VERSION=4.1.2
SUPERSET_PORT=7890
SUPERSET_ADMIN_USERNAME=admin
SUPERSET_ADMIN_PASSWORD=admin
RANGER_VERSION=2.7.0
RANGER_PORT=6080
RANGER_DEBUG_ADMIN=false
KAFKA_VERSION=3.9.1
TRINO_VERSION=471
TRINO_PORT=8081
DATA_LOADING_WEBAPP_PORT=5000
TASK_APIS_PORT=5001
ATLAS_PORT=21000Add the file task-dispatcher/docker-registry-token.txt containing the registry token (with no new line) of the Docker registry hosting the task image. The auth token for the registry can be found in ~/.docker/config.json. To verify that task-dispatcher/docker-registry-token.txt is correct, run:
cat task-dispatcher/docker-registry-token.txt | base64 -dAnd it should return a JSON like:
{
"username": "your-user",
"password": "the-auth-token-found-in~/.docker/config.json",
"serveraddress": "ghcr.io"
}Note: if you want to change a credential, search (and replace) it in the entire repo because, for example, the HIVE_METASTORE_DATABASE_PASSWORD is created in the postgres/init/hive-metastore-init.sql script.
Create the infrastructure with:
make upTo set profiles:
make up COMPOSE_PROFILES=your-profileIf COMPOSE_PROFILES is unset, then it defaults to * (all profiles are activated).
You can also limit the resources of any container by including the docker-compose-resource-limits.yml. To add further compose files:
make up ADDITIONAL_COMPOSE_FILES=docker-compose-resource-limits.ymlBesides, you can specify:
- An env file by setting
ENV_FILEto the env file path (Default:.env) - A limited number of services, by filling the
SERVICESvariable with the name of the services you want to run - Which options should be passed to
docker compose upwithCOMPOSE_COMMAND_OPTIONS=...
For instance, to enable the development mode, run:
make up COMPOSE_PROFILES=local-dev \
ADDITIONAL_COMPOSE_FILES=docker-compose-development.yml \
COMPOSE_COMMAND_OPTIONS="--build --watch"To first render your Docker Compose template, replace make up with make up-rendered.
The user-dashboard service exposes a dashboard for uploading patients and MiR results at /data-loading/patients and /data-loading/mir-results, respectively. Valid extensions for uploaded files are .xls, .xlsx, .ods. When a new Excel is uplaoded, an event with the collected data are sent to the topic devprin.patients/devprin.mir-results. These events can be queried from Trino thanks to the Kafka connector for Trino. To run any query:
docker compose exec -it trino bash
# on the trino shell then run...
trino --server https://localhost:8443 --catalog kafka --schema devprin --user test_user1 --password-- on the trino cli you can select all the records
SELECT * FROM "medical-records";To produce test data with Kafka:
docker compose exec -it kafka-broker-0 bash
kafka-console-producer.sh \
--topic devprin.test \ # specify a previously created topic
--property parse.key=true \
--property key.separator="|" \
--broker-list localhost:9092 #,kafka-broker-1:9092
1|{"prop1":"test", "prop2": 10}To consume data with Kafka:
docker compose exec -it kafka-broker-0 bash
kafka-console-consumer.sh \
--topic devprin.medical-records \
# --property print.key=true \ you cannot deserialize the key because it is a byte object
--property key.separator="-" \
--bootstrap-server localhost:9092 First enable authentication (see the properties in the config.properties file). Authentication requires TLS, but you can bypass it by setting:
http-server.authentication.allow-insecure-over-http=true
By the way, in the demo, HTTPS is used. Now you can authenticate to the server using a different user (test_user1), which must be stored in the password.db file.
trino --server https://localhost:8443 --user test_user1 --passwordInsert the password and run a simple query. It will return an error:
Access Denied: Principal test_user1 cannot become user test_user1
We have not added the user to Ranger yet. So, create a new Ranger User called test_user1 and modify the Trino policies:
- add to
test_user1theimpersonatepermission for resourceTrino user: * - add to
test_user1all permissions for resources:- Trino catalog: *
- Trino schema: *
- Trino table: *
- Trino column: *
- add to
test_user1theexecutepermission for resourceQuery ID: *
In the dev deployment, these policies are already present and are called all - trinouser, all - catalog, schema, table, column, and all - queryid.
To test the insertion on S3, create a table from Trino:
trino --catalog hive --schema default
# sql query to check the cdc
SELECT * FROM patient_records;To create a new Table use the following commands:
# query
CREATE TABLE example (
id INT,
name VARCHAR
)
WITH (
format = 'PARQUET'
);
INSERT INTO example
VALUES
(1, 'Name1'),
(2, 'Name2');
SELECT * FROM example;You can get the objects from minio with:
mc get /data/${MINIO_BUCKET}/metadata/example/${object_name}Kafka is a pub/sub system that could store events indefinetely, but with Trino we cannot perform SQL INSERTs into kafka queues. That is why we need to store them on S3 too, and leverage the Trino integration with Ranger to possibly anonimize the stored data. We are using the Hive metastore to tell Trino where to keep the schema of S3 tables and the format they should have (Parquet). Then, there is a CDC script that periodically copies the Kafka events sent after start time and before the current time (trino/cli/cdc.sh) into S3, in a table called hive.default.patient_records.
The "flush" delay is determined by the env variable FLUSH_DELAY_SECONDS (default 15 seconds)
Hive runs the schema initialization script every time it starts, unless the env variable IS_RESUME=true. However, initializing the schema after it has been previously initialized causes the container failure (only in version 3.1.3, from 4.x this is not an issue, but we need 3.x for Atlas compatibility). This variable is set in the docker compose only when the hive/.hive_initialized file exists. This file:
- is created/updated after a
docker compose up - is deleted after running
make clean-all. If you do not need a complete clean, do not execute it.
It might happen that the schema initialization step fails during the Metastore startup. Even if it depends on the ranger-db service health, it unexpectedly throws with a failure connection error. Just restart the hive-metastore container and should work.
To import Hive columns in Atlas:
docker compose exec -it --user root hive-metastore bash
./import-hive.sh
# Username: admin
# Password: admin
Then exit from the container and run the atlas initialization script:
./atlas/initialize.sh --endpoint http://localhost:21000 --credentials "admin:admin"Initialize ranger with:
./ranger/initialize.sh --endpoint http://localhost:6080 --credentials "admin:rangerR0cks!"Then restart superset:
docker compose restart superset- possibly caused by the exceptions in /var/log/ranger/ranger_admin_sql.log