-
Notifications
You must be signed in to change notification settings - Fork 23
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #233 from RADAR-base/release-0.5.11
Release 0.5.11
- Loading branch information
Showing
13 changed files
with
460 additions
and
168 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
#!/bin/bash | ||
|
||
# Create topics | ||
echo "Creating RADAR-base topics on Confluent Cloud..." | ||
|
||
if ! radar-schemas-tools cc-topic-create -c $CC_CONFIG_FILE_PATH -p $KAFKA_NUM_PARTITIONS -r $KAFKA_NUM_REPLICATION merged; then | ||
echo "FAILED TO CREATE TOPICS ... Retrying again" | ||
if ! radar-schemas-tools cc-topic-create -c $CC_CONFIG_FILE_PATH -p $KAFKA_NUM_PARTITIONS -r $KAFKA_NUM_REPLICATION merged; then | ||
echo "FAILED TO CREATE TOPICS" | ||
exit 1 | ||
else | ||
echo "Created topics at second attempt" | ||
fi | ||
else | ||
echo "Topics created." | ||
fi | ||
|
||
echo "Registering RADAR-base schemas..." | ||
|
||
if ! radar-schemas-tools register --force -u $CC_API_KEY -p $CC_API_SECRET "${KAFKA_SCHEMA_REGISTRY}" merged; then | ||
echo "FAILED TO REGISTER SCHEMAS" | ||
exit 1 | ||
fi | ||
|
||
echo "Schemas registered." | ||
|
||
echo "*******************************************" | ||
echo "** RADAR-base topics and schemas ready **" | ||
echo "*******************************************" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
158 changes: 158 additions & 0 deletions
158
...-schemas-tools/src/main/java/org/radarcns/schema/registration/AbstractTopicRegistrar.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,158 @@ | ||
package org.radarcns.schema.registration; | ||
|
||
import static org.radarcns.schema.CommandLineApp.matchTopic; | ||
|
||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Set; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.TimeoutException; | ||
import java.util.regex.Pattern; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.Stream; | ||
import javax.validation.constraints.NotNull; | ||
|
||
import org.apache.kafka.clients.admin.AdminClient; | ||
import org.apache.kafka.clients.admin.ListTopicsOptions; | ||
import org.apache.kafka.clients.admin.NewTopic; | ||
import org.radarcns.schema.specification.SourceCatalogue; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public abstract class AbstractTopicRegistrar implements TopicRegistrar { | ||
static final int MAX_SLEEP = 32; | ||
private static final Logger logger = LoggerFactory.getLogger(AbstractTopicRegistrar.class); | ||
private Set<String> topics; | ||
|
||
@Override | ||
public int createTopics(@NotNull SourceCatalogue catalogue, int partitions, short replication, | ||
String topic, String match) { | ||
Pattern pattern = matchTopic(topic, match); | ||
|
||
if (pattern == null) { | ||
return createTopics(catalogue, partitions, replication) ? 0 : 1; | ||
} else { | ||
List<String> topicNames = | ||
catalogue.getTopicNames().filter(s -> pattern.matcher(s).find()) | ||
.collect(Collectors.toList()); | ||
|
||
if (topicNames.isEmpty()) { | ||
logger.error("Topic {} does not match a known topic." | ||
+ " Find the list of acceptable topics" | ||
+ " with the `radar-schemas-tools list` command. Aborting.", pattern); | ||
return 1; | ||
} | ||
return createTopics(topicNames.stream(), partitions, replication) ? 0 : 1; | ||
} | ||
} | ||
|
||
|
||
/** | ||
* Create all topics in a catalogue. | ||
* | ||
* @param catalogue source catalogue to extract topic names from | ||
* @param partitions number of partitions per topic | ||
* @param replication number of replicas for a topic | ||
* @return whether the whole catalogue was registered | ||
*/ | ||
private boolean createTopics(@NotNull SourceCatalogue catalogue, int partitions, | ||
short replication) { | ||
ensureInitialized(); | ||
return createTopics(catalogue.getTopicNames(), partitions, replication); | ||
} | ||
|
||
|
||
@Override | ||
public boolean createTopics(Stream<String> topicsToCreate, int partitions, short replication) { | ||
ensureInitialized(); | ||
try { | ||
refreshTopics(); | ||
logger.info("Creating topics. Topics marked with [*] already exist."); | ||
|
||
List<NewTopic> newTopics = topicsToCreate.sorted().distinct().filter(t -> { | ||
if (this.topics != null && this.topics.contains(t)) { | ||
logger.info("[*] {}", t); | ||
return false; | ||
} else { | ||
logger.info("[ ] {}", t); | ||
return true; | ||
} | ||
}).map(t -> new NewTopic(t, partitions, replication)).collect(Collectors.toList()); | ||
|
||
if (!newTopics.isEmpty()) { | ||
getKafkaClient().createTopics(newTopics).all().get(); | ||
logger.info("Created {} topics. Requesting to refresh topics", newTopics.size()); | ||
refreshTopics(); | ||
} else { | ||
logger.info("All of the topics are already created."); | ||
} | ||
return true; | ||
} catch (Exception ex) { | ||
logger.error("Failed to create topics {}", ex.toString()); | ||
return false; | ||
} | ||
} | ||
|
||
@Override | ||
public boolean refreshTopics() throws InterruptedException { | ||
ensureInitialized(); | ||
logger.info("Waiting for topics to become available."); | ||
int sleep = 10; | ||
int numTries = 10; | ||
|
||
topics = null; | ||
ListTopicsOptions opts = new ListTopicsOptions().listInternal(true); | ||
for (int tries = 0; tries < numTries; tries++) { | ||
try { | ||
topics = getKafkaClient().listTopics(opts).names().get(sleep, TimeUnit.SECONDS); | ||
} catch (ExecutionException e) { | ||
logger.error("Failed to list topics from brokers: {}." | ||
+ " Trying again after {} seconds.", e.toString(), sleep); | ||
Thread.sleep(sleep * 1000L); | ||
sleep = Math.min(MAX_SLEEP, sleep * 2); | ||
continue; | ||
} catch (TimeoutException e) { | ||
// do nothing | ||
} | ||
if (topics != null && !topics.isEmpty()) { | ||
break; | ||
} | ||
if (tries < numTries - 1) { | ||
logger.warn("Topics not listed yet after {} seconds", sleep); | ||
} else { | ||
logger.error("Topics have not become available. Failed to wait on Kafka."); | ||
} | ||
sleep = Math.min(MAX_SLEEP, sleep * 2); | ||
} | ||
|
||
if (topics == null || topics.isEmpty()) { | ||
return false; | ||
} else { | ||
Thread.sleep(5000L); | ||
return true; | ||
} | ||
} | ||
|
||
@Override | ||
public Set<String> getTopics() { | ||
ensureInitialized(); | ||
return Collections.unmodifiableSet(topics); | ||
} | ||
|
||
|
||
@Override | ||
public void close() { | ||
if (getKafkaClient() != null) { | ||
getKafkaClient().close(); | ||
} | ||
} | ||
|
||
/** | ||
* Returns an instance of {@code AdminClient} for use. | ||
* | ||
* @return instance of AdminClient. | ||
*/ | ||
abstract AdminClient getKafkaClient(); | ||
|
||
} |
Oops, something went wrong.