Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 68 additions & 15 deletions pyunifi/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def __init__( # pylint: disable=r0913
raise APIError("%s controllers no longer supported" % version)

if ssl_verify is False:
warnings.simplefilter("default", category=InsecureRequestWarning)
warnings.simplefilter("ignore", category=InsecureRequestWarning)

self.log.debug("Controller for %s", self.url)
self._login()
Expand All @@ -128,6 +128,9 @@ def _jsondec(data):
def _api_url(self):
return self.url + "api/s/" + self.site_id + "/"

def _api2_url(self):
return self.url + "v2/api/site/" + self.site_id + "/"

@retry_login
def _read(self, url, params=None):
# Try block to handle the unifi server being offline.
Expand All @@ -138,8 +141,13 @@ def _read(self, url, params=None):

return self._jsondec(response.text)

def _api_read(self, url, params=None):
return self._read(self._api_url() + url, params)
def _api_read(self, url, version=1, params=None):
if version == 1:
return self._read(self._api_url() + url, params)
elif version == 2:
return self._read(self._api2_url() + url, params)
else:
raise NotImplementedError("Protocol version {} not known (yet)".format(version))

@retry_login
def _write(self, url, params=None):
Expand All @@ -150,8 +158,13 @@ def _write(self, url, params=None):

return self._jsondec(response.text)

def _api_write(self, url, params=None):
return self._write(self._api_url() + url, params)
def _api_write(self, url, version=1, params=None):
if version == 1:
return self._write(self._api_url() + url, params)
elif version == 2:
return self._write(self._api2_url() + url, params)
else:
raise NotImplementedError("Protocol version {} not known (yet)".format(version))

@retry_login
def _update(self, url, params=None):
Expand All @@ -162,8 +175,13 @@ def _update(self, url, params=None):

return self._jsondec(response.text)

def _api_update(self, url, params=None):
return self._update(self._api_url() + url, params)
def _api_update(self, url, version=1, params=None):
if version == 1:
return self._update(self._api_url() + url, params)
elif version == 2:
return self._update(self._api2_url() + url, params)
else:
raise NotImplementedError("Protocol version {} not known (yet)".format(version))

@retry_login
def _delete(self, url, params=None):
Expand Down Expand Up @@ -360,6 +378,13 @@ def delete_radius_user(self, user_id):
"""
return self._api_delete('rest/account/' + user_id)

def get_wifi(self):
return self._api_read('wlan/enriched-configuration', version=2)

def set_wifi(self, wifi_id, params):
return self._api_update('rest/wlanconf/' + wifi_id, version=1, params=params)


def get_switch_port_overrides(self, target_mac):
"""Gets a list of port overrides, in dictionary
format, for the given target MAC address. The
Expand Down Expand Up @@ -527,6 +552,27 @@ def archive_all_alerts(self):
"""Archive all Alerts"""
return self._run_command("archive-all-alarms", mgr="evtmgr")

def list_autobackups(self):
backups = self._run_command(
"list-backups",
mgr="backup",
params={"cmd":"list-backups"}
)

sortedbackups = sorted(backups, key=lambda d: d['time'], reverse=True)
return sortedbackups

def get_last_autobackup(self, target_file=None):
backups = self.list_autobackups()
filename = backups[0]['filename']
download_path = 'dl/autobackup/' + filename
if not target_file:
target_file = filename

self.get_backup(download_path=download_path, target_file=target_file)
return target_file


# TODO: Not currently supported on UDMP as it now utilizes async-backups.
def create_backup(self, days="0"):
"""Ask controller to create a backup archive file
Expand All @@ -543,13 +589,20 @@ def create_backup(self, days="0"):
raise APIError(
"Controller version not supported: %s" % self.version
)

res = self._run_command(
"backup",
mgr="system",
params={"days": days}
)
return res[0]["url"]
elif self.version == "v5":
res = self._run_command(
"backup",
mgr="backup",
params={"days": days, "cmd": "backup"}
)
return res[0]["url"]
else:
res = self._run_command(
"backup",
mgr="system",
params={"days": days}
)
return res[0]["url"]

# TODO: Not currently supported on UDMP as it now utilizes async-backups.
def get_backup(self, download_path=None, target_file="unifi-backup.unf"):
Expand All @@ -569,7 +622,7 @@ def get_backup(self, download_path=None, target_file="unifi-backup.unf"):

response = self.session.get(self.url + download_path, stream=True)

if response != 200:
if response.status_code != 200:
raise APIError("API backup failed: %i" % response.status_code)

with open(target_file, "wb") as _backfh:
Expand Down
13 changes: 9 additions & 4 deletions unifi-copy-radius
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python
#!/usr/bin/env python3

import argparse
import json

Expand All @@ -15,13 +15,18 @@ parser.add_argument('-s', '--siteid', default='default', help='the source site I
parser.add_argument('-S', '--siteid2', default='', help='the destination site ID, to copy to')
parser.add_argument('-V', '--no-ssl-verify', default=False, action='store_true', help='Don\'t verify ssl certificates')
parser.add_argument('-C', '--certificate', default='', help='verify with ssl certificate pem file')
parser.add_argument('-d', '--debug', default=False, help='enable debug output', action='store_true')
args = parser.parse_args()

ssl_verify = (not args.no_ssl_verify)

if ssl_verify and len(args.certificate) > 0:
ssl_verify = args.certificate

ssl_verify = args.certificate

if args.debug:
import logging
logging.basicConfig(level=logging.DEBUG)

controller_source = Controller(args.controller, args.username, args.password, args.port, args.version, args.siteid, ssl_verify=ssl_verify)
controller_dest = Controller(args.controller, args.username, args.password, args.port, args.version, args.siteid2, ssl_verify=ssl_verify)

Expand Down
32 changes: 32 additions & 0 deletions unifi-create-backup
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/usr/bin/env python3

import argparse
import time

from pyunifi.controller import Controller

parser = argparse.ArgumentParser()
parser.add_argument('-c', '--controller', default='unifi', help='the controller address (default "unifi")')
parser.add_argument('-u', '--username', default='admin', help='the controller username (default("admin")')
parser.add_argument('-p', '--password', default='', help='the controller password')
parser.add_argument('-b', '--port', default='8443', help='the controller port (default "8443")')
parser.add_argument('-v', '--version', default='v5', help='the controller base version (default "v5")')
parser.add_argument('-s', '--siteid', default='default', help='the site ID, UniFi >=3.x only (default "default")')
parser.add_argument('-d', '--debug', default=False, help='enable debug output', action='store_true')
parser.add_argument('-V', '--no-ssl-verify', default=False, action='store_true', help='Don\'t verify ssl certificates')
parser.add_argument('-C', '--certificate', default='', help='verify with ssl certificate pem file')
parser.add_argument('-f', '--file', default='unifi-backup.unf', help='the filename for the backup')
args = parser.parse_args()

ssl_verify = (not args.no_ssl_verify)

if ssl_verify and len(args.certificate) > 0:
ssl_verify = args.certificate

if args.debug:
import logging
logging.basicConfig(level=logging.DEBUG)

c = Controller(args.controller, args.username, args.password, args.port, args.version, args.siteid, ssl_verify=ssl_verify)

c.get_backup(target_file=args.file)
13 changes: 9 additions & 4 deletions unifi-create-voucher
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3

import argparse

Expand All @@ -13,12 +13,17 @@ parser.add_argument('-v', '--version', default='v5', help='the controller base v
parser.add_argument('-s', '--siteid', default='default', help='the site ID, UniFi >=3.x only (default "default")')
parser.add_argument('-V', '--no-ssl-verify', default=False, action='store_true', help='Don\'t verify ssl certificates')
parser.add_argument('-C', '--certificate', default='', help='verify with ssl certificate pem file')
parser.add_argument('-d', '--debug', default=False, help='enable debug output', action='store_true')
args = parser.parse_args()

ssl_verify = (not args.no_ssl_verify)

if ssl_verify and len(args.certificate) > 0:
ssl_verify = args.certificate
ssl_verify = args.certificate

if args.debug:
import logging
logging.basicConfig(level=logging.DEBUG)

c = Controller(args.controller, args.username, args.password, args.port, args.version, args.siteid, ssl_verify=ssl_verify)

Expand All @@ -34,5 +39,5 @@ def format_code(string):
return first_half + '-' + second_half

voucher_code = format_code(code)
print(voucher_code)

print(voucher_code)
9 changes: 7 additions & 2 deletions unifi-disconnect-client
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3

import argparse

Expand All @@ -11,12 +11,17 @@ parser.add_argument('-p', '--password', default='', help='the controller passwor
parser.add_argument('-b', '--port', default='8443', help='the controller port (default "8443")')
parser.add_argument('-v', '--version', default='v5', help='the controller base version (default "v5")')
parser.add_argument('-s', '--siteid', default='default', help='the site ID, UniFi >=3.x only (default "default")')
parser.add_argument('-d', '--debug', default=False, help='enable debug output', action='store_true')
parser.add_argument('-V', '--no-ssl-verify', default=False, action='store_true', help='Don\'t verify ssl certificates')
parser.add_argument('-C', '--certificate', default='', help='verify with ssl certificate pem file')
parser.add_argument('-d', '--debug', default=False, help='enable debug output', action='store_true')
parser.add_argument('macs', metavar='MAC', nargs='+', help='Client MAC address(es)')
args = parser.parse_args()

ssl_verify = (not args.no_ssl_verify)

if ssl_verify and len(args.certificate) > 0:
ssl_verify = args.certificate

if args.debug:
import logging
logging.basicConfig(level=logging.DEBUG)
Expand Down
50 changes: 50 additions & 0 deletions unifi-download-autobackup
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#!/usr/bin/env python3

import argparse
import os

from pyunifi.controller import Controller

def credFromCredsDir(credname):
if "CREDENTIALS_DIRECTORY" in os.environ.keys():
credfilename = os.path.join(os.environ["CREDENTIALS_DIRECTORY"], credname)
try:
with open(credfilename, 'r') as credfile:
cred = credfile.read().rstrip()
except:
sys.stderr.write("Could not read cred {} from {}".format(credname, credfilename))
sys.exit(2)
else:
sys.stderr.write("CREDENTIALS_DIRECTORY environment not provided")
sys.exit(1)
return cred

parser = argparse.ArgumentParser()
parser.add_argument('-c', '--controller', default='unifi', help='the controller address (default "unifi")')
parser.add_argument('-u', '--username', default='admin', help='the controller username (default("admin")')
parser.add_argument('-p', '--password', default='', help='the controller password')
parser.add_argument('-b', '--port', default='8443', help='the controller port (default "8443")')
parser.add_argument('-v', '--version', default='v5', help='the controller base version (default "v5")')
parser.add_argument('-s', '--siteid', default='default', help='the site ID, UniFi >=3.x only (default "default")')
parser.add_argument('-d', '--debug', default=False, help='enable debug output', action='store_true')
parser.add_argument('-V', '--no-ssl-verify', default=False, action='store_true', help='Don\'t verify ssl certificates')
parser.add_argument('-C', '--certificate', default='', help='verify with ssl certificate pem file')
args = parser.parse_args()

ssl_verify = (not args.no_ssl_verify)

if ssl_verify and len(args.certificate) > 0:
ssl_verify = args.certificate

if args.debug:
import logging
logging.basicConfig(level=logging.DEBUG)

apipassword = args.password
if not apipassword:
apipassword = credFromCredsDir("api-password")

c = Controller(args.controller, args.username, apipassword, args.port, args.version, args.siteid, ssl_verify=ssl_verify)

targetfile = c.get_last_autobackup()
print("Got backup {}".format(targetfile))
Loading