Skip to content

Commit

Permalink
Merge pull request #31 from oceanprotocol/feature/use-nonce
Browse files Browse the repository at this point in the history
Use nonce in signature
  • Loading branch information
ssallam authored Aug 4, 2020
2 parents 678f020 + e08c648 commit bf1308c
Show file tree
Hide file tree
Showing 15 changed files with 129 additions and 395 deletions.
4 changes: 3 additions & 1 deletion READMEs/developers.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ Outcome: DataTokenTemplate and DTFactory are deployed to ganache, rinkeby, or ma

If mainnet: ensure the `FACTORY_DEPLOYER_PRIVATE_KEY` is correct (= an OPF key).

Call the deploy script with (NETWORK = `ganache`, `rinkeby`, or `mainnet`) and (ADDRESSES_FILE_PATH to hold the deployed contracts addresses)
Call the deploy script with (NETWORK = `ganache`, `rinkeby`, or `mainnet`) and (ADDRESSES_FILE_PATH to hold the deployed contracts addresses).
When using already deployed contracts you can skip this, but make sure the `artifacts/addresses.json` file has the up-to-date contracts
addresses for the target network.
```console
./deploy.py ganache artifacts/addresses.json
```
Expand Down
1 change: 0 additions & 1 deletion READMEs/marketplace_flow.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ from ocean_utils.agreements.service_types import ServiceTypes

from ocean_lib.ocean import Ocean
from ocean_lib.ocean.util import from_base_18
from ocean_lib.web3_internal.web3helper import Web3Helper

#Market's config
config = {
Expand Down
2 changes: 2 additions & 0 deletions ocean_lib/assets/asset_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def download_asset_files(
token_address,
token_transfer_tx_id,
data_provider,
nonce,
index=None
):
"""
Expand All @@ -30,6 +31,7 @@ def download_asset_files(
:param token_address: hex str the address of the DataToken smart contract
:param token_transfer_tx_id: hex str the token transfer transaction id (hash)
:param data_provider: DataServiceProvider instance
:param nonce: int value to use in the signature
:param index: Index of the document that is going to be downloaded, int
:return: Asset folder path, str
"""
Expand Down
2 changes: 1 addition & 1 deletion ocean_lib/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def address_file(self):

@property
def storage_path(self):
"""Path to save the current execution of the service agreements and restart if needed."""
"""Path to local storage (database file)."""
return self.get('resources', NAME_STORAGE_PATH)

@property
Expand Down
79 changes: 48 additions & 31 deletions ocean_lib/data_provider/data_service_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from collections import namedtuple
from json import JSONDecodeError

from ocean_lib.config_provider import ConfigProvider
from ocean_lib.web3_internal.utils import add_ethereum_prefix_and_hash_msg
from ocean_utils.agreements.service_types import ServiceTypes
from ocean_utils.exceptions import OceanEncryptAssetUrlsError
Expand All @@ -21,7 +22,7 @@
logger = logging.getLogger(__name__)


OrderRequirements = namedtuple('OrderRequirements', ('amount', 'data_token_address', 'receiver_address'))
OrderRequirements = namedtuple('OrderRequirements', ('amount', 'data_token_address', 'receiver_address', 'nonce'))


class DataServiceProvider:
Expand Down Expand Up @@ -67,6 +68,25 @@ def encrypt_files_dict(files_dict, encrypt_endpoint, asset_id, publisher_address

return response.json()['encryptedDocument']

@staticmethod
def sign_message(wallet, msg, config, nonce=None):
if nonce is None:
nonce = DataServiceProvider.get_nonce(wallet.address, config)
return Web3Helper.sign_hash(
add_ethereum_prefix_and_hash_msg(f'{msg}{nonce}'),
wallet
)

@staticmethod
def get_nonce(user_address, config):
response = DataServiceProvider._http_client.get(
f'{DataServiceProvider.get_url(config)}/services/nonce?userAddress={user_address}'
)
if response.status_code != 200:
return None

return response.json()['nonce']

@staticmethod
def get_order_requirements(did, service_endpoint, consumer_address, service_id, service_type, token_address):
"""
Expand All @@ -77,7 +97,7 @@ def get_order_requirements(did, service_endpoint, consumer_address, service_id,
:param service_id:
:param service_type:
:param token_address:
:return: OrderRequirements instance -- named tuple (amount, data_token_address, receiver_address),
:return: OrderRequirements instance -- named tuple (amount, data_token_address, receiver_address, nonce),
"""
initialize_url = (
f'{service_endpoint}'
Expand All @@ -89,15 +109,15 @@ def get_order_requirements(did, service_endpoint, consumer_address, service_id,
)

logger.info(f'invoke the initialize endpoint with this url: {initialize_url}')
response = DataServiceProvider._http_client.get(initialize_url, stream=True)
response = DataServiceProvider._http_client.get(initialize_url)
# The returned json should contain information about the required number of tokens
# to consume `service_id`. If service is not available there will be an error or
# the returned json is empty.
if response.status_code != 200:
return None
order = dict(response.json())

return OrderRequirements(order['numTokens'], order['dataToken'], order['to'])
return OrderRequirements(order['numTokens'], order['dataToken'], order['to'], order['nonce'])

@staticmethod
def download_service(did, service_endpoint, wallet, files,
Expand All @@ -109,7 +129,7 @@ def download_service(did, service_endpoint, wallet, files,
:param did: str id of the asset
:param service_endpoint: Url to consume, str
:param wallet: hex str Wallet instance of the consumer signing this agreement
:param wallet: hex str Wallet instance of the consumer signing this request
:param files: List containing the files to be consumed, list
:param destination_folder: Path, str
:param service_id: integer the id of the service inside the DDO's service dict
Expand All @@ -119,10 +139,6 @@ def download_service(did, service_endpoint, wallet, files,
:param index: Index of the document that is going to be downloaded, int
:return: True if was downloaded, bool
"""
signature = Web3Helper.sign_hash(
add_ethereum_prefix_and_hash_msg(did),
wallet)

indexes = range(len(files))
if index is not None:
assert isinstance(index, int), logger.error('index has to be an integer.')
Expand All @@ -139,10 +155,11 @@ def download_service(did, service_endpoint, wallet, files,
f'&dataToken={token_address}'
f'&transferTxId={token_transfer_tx_id}'
f'&consumerAddress={wallet.address}'
f'&signature={signature}'
)
config = ConfigProvider.get_config()
for i in indexes:
download_url = base_url + f'&fileIndex={i}'
signature = DataServiceProvider.sign_message(wallet, did, config)
download_url = base_url + f'&signature={signature}&fileIndex={i}'
logger.info(f'invoke consume endpoint with this url: {download_url}')
response = DataServiceProvider._http_client.get(download_url, stream=True)
file_name = DataServiceProvider._get_file_name(response)
Expand Down Expand Up @@ -209,10 +226,10 @@ def start_compute_job(did, service_endpoint, consumer_address, signature,
raise

@staticmethod
def stop_compute_job(agreement_id, job_id, service_endpoint, consumer_address, signature):
def stop_compute_job(did, job_id, service_endpoint, consumer_address, signature):
"""
:param agreement_id: hex str Service Agreement Id
:param did: hex str the asset/DDO id
:param job_id: str id of compute job that was returned from `start_compute_job`
:param service_endpoint: str url of the provider service endpoint for compute service
:param consumer_address: hex str the ethereum address of the consumer's account
Expand All @@ -221,28 +238,28 @@ def stop_compute_job(agreement_id, job_id, service_endpoint, consumer_address, s
:return: bool whether the job was stopped successfully
"""
return DataServiceProvider._send_compute_request(
'put', agreement_id, job_id, service_endpoint, consumer_address, signature)
'put', did, job_id, service_endpoint, consumer_address, signature)

@staticmethod
def restart_compute_job(agreement_id, job_id, service_endpoint, consumer_address, signature):
def restart_compute_job(did, job_id, service_endpoint, consumer_address, signature):
"""
:param agreement_id: hex str Service Agreement Id
:param did: hex str the asset/DDO id
:param job_id: str id of compute job that was returned from `start_compute_job`
:param service_endpoint: str url of the provider service endpoint for compute service
:param consumer_address: hex str the ethereum address of the consumer's account
:param signature: hex str signed message to allow the provider to authorize the consumer
:return: bool whether the job was restarted successfully
"""
DataServiceProvider.stop_compute_job(agreement_id, job_id, service_endpoint, consumer_address, signature)
return DataServiceProvider.start_compute_job(agreement_id, service_endpoint, consumer_address, signature, job_id=job_id)
DataServiceProvider.stop_compute_job(did, job_id, service_endpoint, consumer_address, signature)
return DataServiceProvider.start_compute_job(did, service_endpoint, consumer_address, signature, job_id=job_id)

@staticmethod
def delete_compute_job(agreement_id, job_id, service_endpoint, consumer_address, signature):
def delete_compute_job(did, job_id, service_endpoint, consumer_address, signature):
"""
:param agreement_id: hex str Service Agreement Id
:param did: hex str the asset/DDO id
:param job_id: str id of compute job that was returned from `start_compute_job`
:param service_endpoint: str url of the provider service endpoint for compute service
:param consumer_address: hex str the ethereum address of the consumer's account
Expand All @@ -251,39 +268,39 @@ def delete_compute_job(agreement_id, job_id, service_endpoint, consumer_address,
:return: bool whether the job was deleted successfully
"""
return DataServiceProvider._send_compute_request(
'delete', agreement_id, job_id, service_endpoint, consumer_address, signature)
'delete', did, job_id, service_endpoint, consumer_address, signature)

@staticmethod
def compute_job_status(agreement_id, job_id, service_endpoint, consumer_address, signature):
def compute_job_status(did, job_id, service_endpoint, consumer_address, signature):
"""
:param agreement_id: hex str Service Agreement Id
:param did: hex str the asset/DDO id
:param job_id: str id of compute job that was returned from `start_compute_job`
:param service_endpoint: str url of the provider service endpoint for compute service
:param consumer_address: hex str the ethereum address of the consumer's account
:param signature: hex str signed message to allow the provider to authorize the consumer
:return: dict of job_id to status info. When job_id is not provided, this will return
status for each job_id that exist for the agreement_id
status for each job_id that exist for the did
"""
return DataServiceProvider._send_compute_request(
'get', agreement_id, job_id, service_endpoint, consumer_address, signature)
'get', did, job_id, service_endpoint, consumer_address, signature)

@staticmethod
def compute_job_result(agreement_id, job_id, service_endpoint, consumer_address, signature):
def compute_job_result(did, job_id, service_endpoint, consumer_address, signature):
"""
:param agreement_id: hex str Service Agreement Id
:param did: hex str the asset/DDO id
:param job_id: str id of compute job that was returned from `start_compute_job`
:param service_endpoint: str url of the provider service endpoint for compute service
:param consumer_address: hex str the ethereum address of the consumer's account
:param signature: hex str signed message to allow the provider to authorize the consumer
:return: dict of job_id to result urls. When job_id is not provided, this will return
result for each job_id that exist for the agreement_id
result for each job_id that exist for the did
"""
return DataServiceProvider._send_compute_request(
'get', agreement_id, job_id, service_endpoint, consumer_address, signature
'get', did, job_id, service_endpoint, consumer_address, signature
)

@staticmethod
Expand Down Expand Up @@ -354,11 +371,11 @@ def write_file(response, destination_folder, file_name):
logger.warning(f'consume failed: {response.reason}')

@staticmethod
def _send_compute_request(http_method, agreement_id, job_id, service_endpoint, consumer_address, signature):
def _send_compute_request(http_method, did, job_id, service_endpoint, consumer_address, signature):
compute_url = (
f'{service_endpoint}'
f'?signature={signature}'
f'&serviceAgreementId={agreement_id}'
f'&documentId={did}'
f'&consumerAddress={consumer_address}'
f'&jobId={job_id or ""}'
)
Expand Down
14 changes: 10 additions & 4 deletions ocean_lib/ocean/ocean_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,10 @@ def create(self, metadata, publisher_wallet: Wallet,
if compute_service:
asset.add_service(compute_service)

publisher_signature = Web3Helper.sign_hash(
asset.proof['signatureValue'] = Web3Helper.sign_hash(
add_ethereum_prefix_and_hash_msg(asset.asset_id),
publisher_wallet
)
asset.proof['signatureValue'] = publisher_signature

# Add public key and authentication
asset.add_public_key(did, publisher_wallet.address)
Expand All @@ -167,6 +166,7 @@ def create(self, metadata, publisher_wallet: Wallet,
'files is required in the metadata main attributes.'
logger.debug('Encrypting content urls in the metadata.')

publisher_signature = self._data_provider.sign_message(publisher_wallet, asset.asset_id, self._config)
encrypt_endpoint = self._data_provider.get_encrypt_endpoint(self._config)
files_encrypted = self._data_provider.encrypt_files_dict(
metadata_copy['main']['files'],
Expand Down Expand Up @@ -297,7 +297,7 @@ def order(self, did, consumer_address, service_index=None, service_type=None):
:param consumer_address:
:param service_index:
:param service_type:
:return: OrderRequirements instance -- named tuple (amount, data_token_address, receiver_address),
:return: OrderRequirements instance -- named tuple (amount, data_token_address, receiver_address, nonce),
"""
assert service_type or service_index, f'One of service_index or service_type is required.'
asset = self.resolve(did)
Expand Down Expand Up @@ -347,7 +347,8 @@ def pay_for_service(amount, token_address, receiver_address, from_account):
logger.error(msg)
raise AssertionError(msg)

def download(self, did, service_index, consumer_account, transfer_tx_id, destination, index=None):
def download(self, did, service_index, consumer_account,
transfer_tx_id, destination, nonce=None, index=None):
"""
Consume the asset data.
Expand All @@ -363,6 +364,7 @@ def download(self, did, service_index, consumer_account, transfer_tx_id, destina
:param consumer_account: Account instance of the consumer
:param transfer_tx_id: hex str id of the token transfer transaction
:param destination: str path
:param nonce: int value to use in the signature
:param index: Index of the document that is going to be downloaded, int
:return: str path to saved files
"""
Expand All @@ -371,6 +373,9 @@ def download(self, did, service_index, consumer_account, transfer_tx_id, destina
assert isinstance(index, int), logger.error('index has to be an integer.')
assert index >= 0, logger.error('index has to be 0 or a positive integer.')

if nonce is None:
nonce = self._data_provider.get_nonce(consumer_account.address, self._config)

service = asset.get_service_by_index(service_index)
assert service and service.type == ServiceTypes.ASSET_ACCESS, \
f'Service with index {service_index} and type {ServiceTypes.ASSET_ACCESS} is not found.'
Expand All @@ -383,6 +388,7 @@ def download(self, did, service_index, consumer_account, transfer_tx_id, destina
asset.data_token_address,
transfer_tx_id,
self._data_provider,
nonce,
index
)

Expand Down
Loading

0 comments on commit bf1308c

Please sign in to comment.