Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions compiler/definitions/ir/nodes/dfs_split_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@ def __init__(self, inputs, outputs, com_name, com_category,
def set_server_address(self, addr): # ex addr: 127.0.0.1:50051
self.com_options.append((3, Arg(string_to_argument(f"--addr {addr}"))))

def make_dfs_split_reader_node(inputs, output, split_num, prefix):
def make_dfs_split_reader_node(inputs, output, split_num):
split_reader_bin = os.path.join(config.PASH_TOP, config.config['runtime']['dfs_split_reader_binary'])
com_name = Arg(string_to_argument(split_reader_bin))
com_category = "pure"
options = []
options.append((1, Arg(string_to_argument(f"--prefix '{prefix}'"))))
options.append((2, Arg(string_to_argument(f"--split {split_num}"))))
options.append((1, Arg(string_to_argument(f"--split {split_num}"))))

return DFSSplitReader(inputs,
[output],
Expand Down
44 changes: 27 additions & 17 deletions compiler/dspash/hdfs_file_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
from collections import namedtuple
import json
from typing import List, Tuple
import requests

HDFSBlock = namedtuple("HDFSBlock", "path hosts")

# The Bash helper function name (defind in hdfs_utils.sh) for getting the local block path
HDFS_BLOCK_PATH_FUNC = "get_hdfs_block_path"

class FileData(object):
def __init__(self, filename):
Expand All @@ -27,13 +29,7 @@ def paths(self):
for i in range(len(self.blocknames)):
filepaths.append(
os.path.join(
"current",
self.dnodenames[i],
"current",
"finalized",
"subdir0",
"subdir0",
self.blocknames[i],
f"$({HDFS_BLOCK_PATH_FUNC} {self.dnodenames[i]} {self.blocknames[i]})"
)
)
return filepaths
Expand Down Expand Up @@ -65,15 +61,28 @@ def __eq__(self, __o: object) -> bool:
return False
return self.blocks == __o.blocks

def get_hdfs_file_data(filename):
info = FileData(filename)
log = subprocess.check_output(
"hdfs fsck {0} -files -blocks -locations".format(filename), shell=True, stderr=subprocess.PIPE
)
def get_hdfs_file_data(filepath):
# Workaround included quotation marks when cat is called with this notation"${IN}"
# TODO: this should be fixed somewhere higher in the stack
filepath = filepath.lstrip("\"").rstrip("\"")

# Use webhdfs to get the block data as it's much faster
# TODO: don't harcode the namenode address
url = f"http://namenode:9870/fsck"
params = {
'ugi': 'root',
'files': '1',
'blocks': '1',
'locations': '1',
'path': filepath
}
info = FileData(filepath)
r = requests.get(url = url, params = params)

count = 0
for line in log.splitlines():
for line in r.text.splitlines():
wordarr = line.split()
if len(wordarr) > 0 and wordarr[0].decode("utf-8") == filename and count == 0:
if len(wordarr) > 0 and wordarr[0] == filename and count == 0:
info.size = int(wordarr[1])
count += 1
elif (
Expand All @@ -83,13 +92,14 @@ def get_hdfs_file_data(filename):
and int(wordarr[0][:-1]) == count - 1
):
count += 1
rawinfo = wordarr[1].decode("utf-8").split(":")
rawinfo = wordarr[1].split(":")
info.blocknames.append(rawinfo[1][0 : rawinfo[1].rfind("_")])
info.dnodenames.append(rawinfo[0])
stline = line.decode("utf-8")
stline = line
info.machines.append(
_getIPs(stline[stline.find("DatanodeInfoWithStorage") - 1 :])
)

assert len(info.blocknames) != 0
assert len(info.dnodenames) != 0
assert info.size > 0
Expand Down
12 changes: 12 additions & 0 deletions compiler/dspash/hdfs_utils.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Helper functions and env for hdfs support

datanode_dir=$(hdfs getconf -confKey dfs.datanode.data.dir)
export HDFS_DATANODE_DIR=${datanode_dir#"file://"} # removes file:// prefix

function get_hdfs_block_path() {
dnodeName="$1"
blockID="$2"
find "$HDFS_DATANODE_DIR/current/$dnodeName" -name "$blockID"
}

export -f get_hdfs_block_path
206 changes: 149 additions & 57 deletions compiler/dspash/ir_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import subprocess
import pash_runtime
from collections import deque, defaultdict
import stat, os

HOST = socket.gethostbyname(socket.gethostname())
NEXT_PORT = 58000
Expand Down Expand Up @@ -75,6 +76,26 @@ def to_shell_file(graph: IR, args) -> str:
f.write(script)
return filename

def optimize_named_fifos(graph):
"""
Replaces named fifos with ephemeral fifos when we
know both the read and write ends of the named fifo
"""
named_fifos = set()
for fid in graph.all_fids():
path = str(fid.get_resource())
try:
if fid.has_file_resource() and stat.S_ISFIFO(os.stat(path).st_mode):
_, from_node, to_node = graph.edges[fid.ident]
if from_node != None and to_node != None:
named_fifos.add(fid)
except:
pass
for named_fifo in named_fifos:
named_fifo.make_ephemeral()

return graph

def split_ir(graph: IR) -> Tuple[List[IR], Dict[int, IR]]:
""" Takes an optimized IR and splits it subgraphs. Every subgraph
is a continues section between a splitter and a merger.
Expand All @@ -101,9 +122,18 @@ def split_ir(graph: IR) -> Tuple[List[IR], Dict[int, IR]]:
source_node_ids = graph.source_nodes()
input_fifo_map = defaultdict(list)

subgraphs = []
graph = optimize_named_fifos(graph)

subgraphs = set()
queue = deque([(source, IR({}, {})) for source in source_node_ids])

# Set next_graph_policy to the wanted policy
combine_after_merge_policy = lambda combining_subgraph=queue[0][1]: combining_subgraph
next_graph_policy = combine_after_merge_policy
# Comment the above and uncomment below to change graph splitting policy
# new_graph_policy: lambda : IR({}, {})
# next_graph_policy = new_graph_policy

# Graph is a DAG so we need to keep track of traversed edges
visited_edges = set(graph.all_input_fids())
visited_nodes = set()
Expand All @@ -115,14 +145,14 @@ def split_ir(graph: IR) -> Tuple[List[IR], Dict[int, IR]]:

if(any(map(lambda fid:fid not in visited_edges, input_fids))):
if subgraph.source_nodes():
subgraphs.append(subgraph)
subgraphs.add(subgraph)
continue

# Second condition makes sure we don't add empty graphs
if len(input_fids) > 1 and subgraph.source_nodes(): # merger node
if subgraph not in subgraphs:
subgraphs.append(subgraph)
subgraph = IR({}, {})
subgraphs.add(subgraph)
subgraph = next_graph_policy()

if old_node_id in visited_nodes:
continue
Expand Down Expand Up @@ -162,11 +192,13 @@ def split_ir(graph: IR) -> Tuple[List[IR], Dict[int, IR]]:
if len(next_ids) == 1:
queue.append((next_ids[0], subgraph))
else:
subgraphs.append(subgraph)
subgraphs.add(subgraph)
for next_id in next_ids:
queue.append((next_id, IR({}, {})))

# print(list(map(lambda k : k.all_fids(), graphs)))
queue.append((next_id, next_graph_policy()))

# for graph in subgraphs:
# print(to_shell(graph, config.pash_args), file=sys.stderr)

return subgraphs, input_fifo_map

def add_stdout_fid(graph : IR, file_id_gen: FileIdGen) -> FileId:
Expand All @@ -175,7 +207,56 @@ def add_stdout_fid(graph : IR, file_id_gen: FileIdGen) -> FileId:
graph.add_edge(stdout)
return stdout

def assign_workers_to_subgraphs(subgraphs:List[IR], file_id_gen: FileIdGen, input_fifo_map:Dict[int, IR], get_worker: Callable) -> (IR, Tuple):

def create_remote_pipe_from_output_edge(from_subgraph: IR, to_subgraph: IR, edge: FileId, host, port, file_id_gen: FileIdGen):
stdout = add_stdout_fid(from_subgraph, file_id_gen)
edge_id = edge.get_ident()

# Replace the old edge with an ephemeral edge in case it isn't and
# to avoid modifying the edge in case it's used in some other subgraph
ephemeral_edge = file_id_gen.next_ephemeral_file_id()
from_subgraph.replace_edge(edge_id, ephemeral_edge)
edge_uid = uuid4()
# Add remote-write node at the end of the subgraph
remote_write = remote_pipe.make_remote_pipe([ephemeral_edge.get_ident()], [stdout.get_ident()], host, port, False, edge_uid)
from_subgraph.add_node(remote_write)

# Copy the old output edge resource
new_edge = file_id_gen.next_file_id()
new_edge.set_resource(edge.get_resource())
# Get the subgraph which "edge" writes to
if edge_id in to_subgraph.edges:
# Replace the old output edge resource
to_subgraph.replace_edge(edge_id, new_edge)
else:
to_subgraph.add_edge(new_edge)

remote_read = remote_pipe.make_remote_pipe([], [new_edge.get_ident()], host, port, True, edge_uid)
to_subgraph.add_node(remote_read)

def create_remote_pipe_from_input_edge(from_subgraph: IR, to_subgraph: IR, edge: FileId, host, port, file_id_gen: FileIdGen):
stdout = add_stdout_fid(from_subgraph, file_id_gen)

# Copy the old input edge resource
new_edge = file_id_gen.next_file_id()
new_edge.set_resource(edge.get_resource())
from_subgraph.add_edge(new_edge)

# Add remote write to main subgraph
edge_uid = uuid4()
remote_write = remote_pipe.make_remote_pipe([new_edge.get_ident()], [stdout.get_ident()], HOST, DISCOVERY_PORT, False, edge_uid)
from_subgraph.add_node(remote_write)

# Add remote read to current subgraph
ephemeral_edge = file_id_gen.next_ephemeral_file_id()
old_edge_id = edge.get_ident()

to_subgraph.replace_edge(old_edge_id, ephemeral_edge)

remote_read = remote_pipe.make_remote_pipe([], [ephemeral_edge.get_ident()], HOST, DISCOVERY_PORT, True, edge_uid)
to_subgraph.add_node(remote_read)

def assign_workers_to_subgraphs(subgraphs:List[IR], file_id_gen: FileIdGen, input_fifo_map:Dict[int, IR], get_worker: Callable) -> Tuple[IR, List]:
""" Takes a list of subgraphs and assigns a worker to each subgraph and augment
the subgraphs with the necessary remote read/write nodes for data movement
between workers. This function also produces graph that should run in
Expand Down Expand Up @@ -203,60 +284,71 @@ def assign_workers_to_subgraphs(subgraphs:List[IR], file_id_gen: FileIdGen, inpu
worker._running_processes += 1
worker_subgraph_pairs.append((worker, subgraph))
sink_nodes = subgraph.sink_nodes()
assert(len(sink_nodes) == 1)

for out_edge in subgraph.get_node_output_fids(sink_nodes[0]):
stdout = add_stdout_fid(subgraph, file_id_gen)
out_edge_id = out_edge.get_ident()
# Replace the old edge with an ephemeral edge in case it isn't and
# to avoid modifying the edge in case it's used in some other subgraph
ephemeral_edge = file_id_gen.next_ephemeral_file_id()
subgraph.replace_edge(out_edge_id, ephemeral_edge)
edge_uid = uuid4()
# Add remote-write node at the end of the subgraph
remote_write = remote_pipe.make_remote_pipe([ephemeral_edge.get_ident()], [stdout.get_ident()], worker.host(), DISCOVERY_PORT, False, edge_uid)
subgraph.add_node(remote_write)

# Copy the old output edge resource
new_edge = file_id_gen.next_file_id()
new_edge.set_resource(out_edge.get_resource())
# Get the subgraph which "edge" writes to
if out_edge_id in input_fifo_map and out_edge.is_ephemeral():
# Copy the old output edge resource
matching_subgraph = input_fifo_map[out_edge_id][0]
matching_subgraph.replace_edge(out_edge.get_ident(), new_edge)
else:
matching_subgraph = main_graph
matching_subgraph.add_edge(new_edge)

remote_read = remote_pipe.make_remote_pipe([], [new_edge.get_ident()], worker.host(), DISCOVERY_PORT, True, edge_uid)
matching_subgraph.add_node(remote_read)
for sink_node in sink_nodes:
for out_edge in subgraph.get_node_output_fids(sink_node):
out_edge_id = out_edge.get_ident()
if out_edge_id in input_fifo_map and out_edge.is_ephemeral():
# Copy the old output edge resource
matching_subgraph = input_fifo_map[out_edge_id][0]
else:
matching_subgraph = main_graph

# Replace non ephemeral input edges with remote read/write
for subgraph in subgraphs:
source_nodes = subgraph.source_nodes()
for source in source_nodes:
for in_edge in subgraph.get_node_input_fids(source):
if in_edge.has_file_resource() or in_edge.has_file_descriptor_resource():
# setup
stdout = add_stdout_fid(main_graph, file_id_gen)
if matching_subgraph == subgraph:
continue

# In case this fid from and to nodes are in the same graph
# we need to both read/write the fid from/to the main machine
# TODO: we might want to optimize this by offloading everything to the
# main subgraph
to_node = subgraph.get_edge_to(out_edge_id)
if to_node:
# remove previous connection
subgraph.set_edge_to(out_edge_id, None)

# Copy the old input edge resource
new_edge = file_id_gen.next_file_id()
new_edge.set_resource(in_edge.get_resource())
main_graph.add_edge(new_edge)
# Add new connectiong
new_output_edge = file_id_gen.next_file_id()
new_output_edge.set_resource(out_edge.get_resource())
subgraph.add_to_edge(new_output_edge, to_node)
subgraph.get_node(to_node).replace_edge(out_edge_id, new_output_edge.get_ident())

# Add remote write to main subgraph
edge_uid = uuid4()
remote_write = remote_pipe.make_remote_pipe([new_edge.get_ident()], [stdout.get_ident()], HOST, DISCOVERY_PORT, False, edge_uid)
main_graph.add_node(remote_write)
# Add remote pipe to write from subgraph to main graph
create_remote_pipe_from_input_edge(from_subgraph=matching_subgraph, to_subgraph=subgraph, edge=new_output_edge, host=HOST, port=DISCOVERY_PORT, file_id_gen=file_id_gen)

# Add remote read to current subgraph
ephemeral_edge = file_id_gen.next_ephemeral_file_id()
subgraph.replace_edge(in_edge.get_ident(), ephemeral_edge)

remote_read = remote_pipe.make_remote_pipe([], [ephemeral_edge.get_ident()], HOST, DISCOVERY_PORT, True, edge_uid)
subgraph.add_node(remote_read)
create_remote_pipe_from_output_edge(from_subgraph=subgraph, to_subgraph=matching_subgraph, edge=out_edge, host=worker.host(), port=DISCOVERY_PORT, file_id_gen=file_id_gen)

# Replace non ephemeral input edges with remote read/write
for worker, subgraph in worker_subgraph_pairs:
nodes = list(subgraph.nodes.keys())
for source in nodes:
for in_edge in subgraph.get_node_input_fids(source):
# If we didn't expand HDFSCat then we shouldn't modify it's input fids
# We might need annotation changes if we need to be more general
if isinstance(subgraph.get_node(source), HDFSCat):
continue
if in_edge.has_file_resource() or in_edge.has_file_descriptor_resource():
old_edge_id = in_edge.get_ident()

# In case this fid from and to nodes are in the same graph
# we need to both read/write the fid from/to the main machine
# TODO: we might want to optimize this by offloading everything to the
# main subgraph
from_node = subgraph.get_edge_from(old_edge_id)
if from_node:
# remove previous connection
subgraph.set_edge_from(old_edge_id, None)

# Add new connectiong
new_output_edge = file_id_gen.next_file_id()
new_output_edge.set_resource(in_edge.get_resource())
subgraph.add_from_edge(from_node, new_output_edge)
subgraph.get_node(from_node).replace_edge(old_edge_id, new_output_edge.get_ident())

# Add remote pipe to write from subgraph to main graph
create_remote_pipe_from_output_edge(from_subgraph=subgraph, to_subgraph=main_graph, edge=new_output_edge, host=worker.host(), port=DISCOVERY_PORT, file_id_gen=file_id_gen)

create_remote_pipe_from_input_edge(from_subgraph=main_graph, to_subgraph=subgraph, edge=in_edge, host=HOST, port=DISCOVERY_PORT, file_id_gen=file_id_gen)
else:
# sometimes a command can have both a file resource and an ephemeral resources (example: spell oneliner)
continue
Expand Down
Loading