-
Notifications
You must be signed in to change notification settings - Fork 168
Description
In trying to use some simple python like this:
from OTXv2 import OTXv2
try:
with open('config.json', 'r') as file:
config = json.load(file)
except:
quit('config.json not found, or unreadable.')
otx = OTXv2(config['otx_api_key'])
pulses = otx.get_my_pulses(max_items=200)
print(pulses)The result would always be an empty list: [] No matter what arguments I passed to the get_my_pulses function.
I started digging into the source OTXv2 library. And hacked together the equivalent necessary functions using the requests library
from requests import Session
from requests.packages.urllib3.util import Retry
from requests.adapters import HTTPAdapter
import json
try:
with open('config.json', 'r') as file:
config = json.load(file)
except:
quit('config.json not found, or unreadable.')
api = "https://otx.alienvault.com"
headers = {
'X-OTX-API-KEY': config['otx_api_key'],
'Content-Type': 'application/json'
}
session = Session()
session.mount('https://', HTTPAdapter(
max_retries=Retry(
total=5,
status_forcelist=[429, 500, 502, 503, 504],
backoff_factor=1,
)
))
response = session.get(api+'/api/v1/pulses/my', headers=headers).json()
print(response['count'])Which yielded a more expected response 214 which is accurate.
Upon digging more into the OTXv2 library functions, I hacked this together to give me just the necessary bits so that I could place some print statements and find what URLs were being formed.
import requests
try:
from urllib.parse import urlencode
except ImportError:
from urllib import urlencode
import json
API_V1_ROOT = "/api/v1"
MY_PULSES = "{}/pulses/my".format(API_V1_ROOT)
server="https://otx.alienvault.com"
headers = {
'X-OTX-API-KEY': '<redacted>',
'Content-Type': 'application/json'
}
def post(url, body=None, headers=None, files=None, **kwargs):
"""
Internal API for POST request on a OTX URL
:param url: URL to retrieve
:param body: HTTP Body to send in request
:param headers: (optional) dict of headers to use, instead of default headers
:param files: (optional) list of file tuples, if posting multipart form data
:return: response as dict
"""
response = requests.session().post(
create_url(url, **kwargs),
data=json.dumps(body) if body else None,
files=files,
headers=headers
)
print(json.dumps(response.json(), indent=2))
return response.json()
def get(url, **kwargs):
"""
Internal API for GET request on a OTX URL
:param url: URL to retrieve
:return: response in JSON object form
"""
response = requests.session().get(create_url(url, **kwargs), headers=headers)
print(json.dumps(response.json(), indent=2))
return response.json()
def walkapi_iter(url, max_page=None, max_items=None, method='GET', body=None):
next_page_url = url
print('next_page_url', next_page_url)
count = 0
item_count = 0
while next_page_url:
count += 1
if max_page and count > max_page:
break
if method == 'GET':
data = get(next_page_url)
elif method == 'POST':
data = post(next_page_url, body=body)
else:
raise Exception("Unsupported method type: {}".format(method))
for el in data['results']:
item_count += 1
if max_items and item_count > max_items:
break
yield el
next_page_url = data["next"]
def create_url(url_path, **kwargs):
""" Turn a path into a valid fully formatted URL. Supports query parameter formatting as well.
:param url_path: Request path (i.e. "/search/pulses")
:param kwargs: key value pairs to be added as query parameters (i.e. limit=10, page=5)
:return: a formatted url (i.e. "/search/pulses")
"""
print('----start create url----')
uri = url_path.format(server)
print(uri)
uri = uri if uri.startswith("http") else server.rstrip('/') + uri
print(uri)
if kwargs:
uri += "?" + urlencode(kwargs)
print(uri)
print('----stop create url---')
return uri
def walkapi(url, iter=False, max_page=None, max_items=None, method='GET', body=None):
if iter:
print(walkapi_iter(url, max_page=max_page, max_items=max_items, method=method, body=body))
return walkapi_iter(url, max_page=max_page, max_items=max_items, method=method, body=body)
else:
print(list(walkapi_iter(url, max_page=max_page, max_items=max_items, method=method, body=body)))
return list(walkapi_iter(url, max_page=max_page, max_items=max_items, method=method, body=body))
def get_my_pulses(query=None, max_items=200):
print('pulse?', walkapi(create_url(MY_PULSES, limit=50, q=query), max_items=max_items))
#print(walkapi(create_url(MY_PULSES, limit=50)))
return walkapi(create_url(MY_PULSES, limit=50, q=query), max_items=max_items)This yielded URLs consistently like this, uring the create_url() function:
----start create url----
https://otx.alienvault.com/api/v1/pulses/my?limit=50&q=None
https://otx.alienvault.com/api/v1/pulses/my?limit=50&q=None
https://otx.alienvault.com/api/v1/pulses/my?limit=50&q=None
----stop create url---
q= is not a valid API query parameter according to the OTX API documentation: https://otx.alienvault.com/api
only limit, page, 'since`.
I'm haven't completely learned the OTXv2 code base, but I don't see anywhere were q= is supposed to be formatted into something the API accepts out of the above list of valid queries.
If you provide any sort of data by passing it in with get_my_pulses(query=<anything>) it always formats the URL as
https://otx.alienvault.com/api/v1/pulses/my?limit=50&q=<anything> which the API responds to with 0 results.
Am I missing something?
Note: I saw similar issues with get_user_pulses() but it appears to be virtually identical just with a different API endpoint URL.