Skip to content

Commit

Permalink
kcat support in aiven-client
Browse files Browse the repository at this point in the history
added required functions

remove linting errors

modified readme to kcat

fixed space

removed unnecessary code duplication added a param

added comment
  • Loading branch information
ftisiot committed Aug 17, 2022
1 parent f30b306 commit dfc4b4f
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 11 deletions.
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ Each project has its own CA cert, and other services (notably Kafka) use mutualT

$ avn service user-creds-download --username avnadmin <service>

For working with `kafkacat <https://github.com/edenhill/kafkacat>`_ (see also our `help article <https://help.aiven.io/en/articles/2607674-using-kafkacat>`_ ) or the command-line tools that ship with Kafka itself, a keystore and trustore are needed. By specifying which user's creds to use, and a secret, you can generate these via ``avn`` too::
For working with `kcat <https://github.com/edenhill/kcat>`_ (see also our `help article <https://developer.aiven.io/docs/products/kafka/howto/kcat.html>`_ ) or the command-line tools that ship with Kafka itself, a keystore and trustore are needed. By specifying which user's creds to use, and a secret, you can generate these via ``avn`` too::

$ avn service user-kafka-java-creds --username avnadmin -p t0pS3cr3t <service>

Expand Down
56 changes: 54 additions & 2 deletions aiven/client/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1343,6 +1343,7 @@ def _get_store_from_args(self) -> Store:
store = Store.skip
return store

# This method should be deprecated and removed over time. Check service__connection_info__kcat
@arg.project
@arg.service_name
@arg("-r", "--route", choices=("dynamic", "privatelink", "public"))
Expand All @@ -1366,7 +1367,8 @@ def service__connection_info__kafkacat(self) -> None:
privatelink_connection_id=self._get_privatelink_connection_id_from_args(),
username=self.args.username,
)
cmd = ci.kafkacat(
cmd = ci.kcat(
"kafkacat",
store,
get_project_ca=self._get_project_ca,
ca_path=self.args.ca_path,
Expand All @@ -1380,7 +1382,57 @@ def service__connection_info__kafkacat(self) -> None:
privatelink_connection_id=self._get_privatelink_connection_id_from_args(),
username=self.args.username,
)
cmd = sasl_ci.kafkacat(
cmd = sasl_ci.kcat(
"kafkacat",
store,
get_project_ca=self._get_project_ca,
ca_path=self.args.ca_path,
)
else:
raise NotImplementedError(self.args.kafka_authentication_method)

print(" ".join(cmd))

@arg.project
@arg.service_name
@arg("-r", "--route", choices=("dynamic", "privatelink", "public"))
@arg("-p", "--privatelink-connection-id")
@arg("-a", "--kafka-authentication-method", choices=("certificate", "sasl"), default="certificate")
@arg("-u", "--username", default="avnadmin")
@arg("--ca", default="ca.pem", dest="ca_path")
@arg("--client-cert", default="service.crt", dest="client_cert_path")
@arg("--client-key", default="service.key", dest="client_key_path")
@arg("-w", "--write", action="store_true", help="Save certificate and key files if they don't not exist")
@arg("-W", "--overwrite", action="store_true", help="Save and overwrite certificate and key files")
def service__connection_info__kcat(self) -> None:
"""kcat command string"""
service = self.client.get_service(project=self.get_project(), service=self.args.service_name)
store = self._get_store_from_args()

if self.args.kafka_authentication_method == "certificate":
ci = KafkaCertificateConnectionInfo.from_service(
service,
route=self._get_route_from_args(),
privatelink_connection_id=self._get_privatelink_connection_id_from_args(),
username=self.args.username,
)
cmd = ci.kcat(
"kcat",
store,
get_project_ca=self._get_project_ca,
ca_path=self.args.ca_path,
client_cert_path=self.args.client_cert_path,
client_key_path=self.args.client_key_path,
)
elif self.args.kafka_authentication_method == "sasl":
sasl_ci = KafkaSASLConnectionInfo.from_service(
service,
route=self._get_route_from_args(),
privatelink_connection_id=self._get_privatelink_connection_id_from_args(),
username=self.args.username,
)
cmd = sasl_ci.kcat(
"kcat",
store,
get_project_ca=self._get_project_ca,
ca_path=self.args.ca_path,
Expand Down
28 changes: 20 additions & 8 deletions aiven/client/connection_info/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,33 @@ class KafkaConnectionInfo: # pylint: disable=too-few-public-methods
host: str
port: int

def _kafkacat(
self, protocol: str, ca_path: str, extra: Sequence[str], store: Store, get_project_ca: Callable[[], str]
def _kcat(
self,
tool_name: str,
protocol: str,
ca_path: str,
extra: Sequence[str],
store: Store,
get_project_ca: Callable[[], str],
) -> Sequence[str]:
store.handle(get_project_ca, ca_path)
address = f"{self.host}:{self.port}"
return ["kafkacat", "-b", address, "-X", f"security.protocol={protocol}", "-X", f"ssl.ca.location={ca_path}", *extra]
return [tool_name, "-b", address, "-X", f"security.protocol={protocol}", "-X", f"ssl.ca.location={ca_path}", *extra]


@dataclass
class KafkaCertificateConnectionInfo(KafkaConnectionInfo):
client_cert: str
client_key: str

def kafkacat(
self, store: Store, get_project_ca: Callable[[], str], ca_path: str, client_key_path: str, client_cert_path: str
def kcat(
self,
tool_name: str,
store: Store,
get_project_ca: Callable[[], str],
ca_path: str,
client_key_path: str,
client_cert_path: str,
) -> Sequence[str]:
store.handle(lambda: self.client_cert, client_cert_path)
store.handle(lambda: self.client_key, client_key_path)
Expand All @@ -37,7 +49,7 @@ def kafkacat(
"-X",
f"ssl.certificate.location={client_cert_path}",
]
return self._kafkacat("SSL", ca_path, extra, store, get_project_ca)
return self._kcat(tool_name, "SSL", ca_path, extra, store, get_project_ca)

@classmethod
def from_service(
Expand Down Expand Up @@ -100,7 +112,7 @@ def from_service(
raise ConnectionInfoError(f"Could not find password for username {username}")
return cls(host=info["host"], port=info["port"], username=username, password=user["password"])

def kafkacat(self, store: Store, get_project_ca: Callable[[], str], ca_path: str) -> Sequence[str]:
def kcat(self, tool_name: str, store: Store, get_project_ca: Callable[[], str], ca_path: str) -> Sequence[str]:
extra = [
"-X",
"sasl.mechanisms=SCRAM-SHA-256",
Expand All @@ -109,4 +121,4 @@ def kafkacat(self, store: Store, get_project_ca: Callable[[], str], ca_path: str
"-X",
f"sasl.password={self.password}",
]
return self._kafkacat("SASL_SSL", ca_path, extra, store, get_project_ca)
return self._kcat(tool_name, "SASL_SSL", ca_path, extra, store, get_project_ca)

0 comments on commit dfc4b4f

Please sign in to comment.