Skip to content
Open
8 changes: 8 additions & 0 deletions maestrowf/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,14 @@ def initialize(self, batch_info, sleeptime=60):
# Write metadata
self._exec_dag.set_adapter(batch_info)
self._study.store_metadata()

LOGGER.debug("Exporting dag.")
if self._study.draw:
# Setup base name for each format option
dag_basename = os.path.join(self._study.output_path,
'dag_{}_'.format(self._study.name))
self._exec_dag.export_dag_vis(dag_basename, self._study.draw)

self._setup = True

def monitor_study(self):
Expand Down
25 changes: 17 additions & 8 deletions maestrowf/datastructures/core/executiongraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ def __init__(self, workspace, step, **kwargs):
self.script = kwargs.get("script", "")
self.restart_script = kwargs.get("restart", "")
self.to_be_scheduled = False
self.step = step
self._step = step
self.restart_limit = kwargs.get("restart_limit", 3)

# Status Information
# Status Informationp
self._num_restarts = 0
self._submit_time = None
self._start_time = None
Expand All @@ -82,11 +82,13 @@ def generate_script(self, adapter, tmp_dir=""):
else:
scr_dir = self.workspace.value

self.step.run["cmd"] = self.workspace.substitute(self.step.run["cmd"])
self._step.run["cmd"] = self.workspace.substitute(
self._step.run["cmd"]
)

LOGGER.info("Generating script for %s into %s", self.name, scr_dir)
self.to_be_scheduled, self.script, self.restart_script = \
adapter.write_script(scr_dir, self.step)
adapter.write_script(scr_dir, self._step)
LOGGER.info("Script: %s\nRestart: %s\nScheduled?: %s",
self.script, self.restart_script, self.to_be_scheduled)

Expand Down Expand Up @@ -122,12 +124,12 @@ def can_restart(self):
def _execute(self, adapter, script):
if self.to_be_scheduled:
srecord = adapter.submit(
self.step, script, self.workspace.value)
self._step, script, self.workspace.value)
else:
self.mark_running()
ladapter = ScriptAdapterFactory.get_adapter("local")()
srecord = ladapter.submit(
self.step, script, self.workspace.value)
self._step, script, self.workspace.value)

retcode = srecord.submission_code
jobid = srecord.job_identifier
Expand Down Expand Up @@ -230,7 +232,14 @@ def name(self):

:returns: The name of the StudyStep contained within the record.
"""
return self.step.name
return self._step.name

@property
def step(self):
"""
Get the study step object represented by the record instance
"""
return self._step

@property
def walltime(self):
Expand All @@ -239,7 +248,7 @@ def walltime(self):

:returns: A string representing the requested computing time.
"""
return self.step.run["walltime"]
return self._step.run["walltime"]

@property
def time_submitted(self):
Expand Down
14 changes: 14 additions & 0 deletions maestrowf/datastructures/core/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,20 @@ def apply(self, item):
# substrings.
return item

@property
def param_vals(self):
"""
Return dict of parameter values
"""
return self._params

@property
def param_labels(self):
"""
Return dict of parameter labels
"""
return self._labels


class ParameterGenerator:
"""
Expand Down
62 changes: 46 additions & 16 deletions maestrowf/datastructures/core/study.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class StudyStep:
def __init__(self):
"""Object that represents a single workflow step."""
self.name = ""
self.base_name = "" # Stores unparameterized name
self.description = ""
self.run = {
"cmd": "",
Expand All @@ -83,6 +84,9 @@ def __init__(self):
"reservation": ""
}

self._param_vals = {} # Better to be None?
self._param_labels = {}

def apply_parameters(self, combo):
"""
Apply a parameter combination to the StudyStep.
Expand All @@ -92,8 +96,13 @@ def apply_parameters(self, combo):
"""
# Create a new StudyStep and populate it with substituted values.
tmp = StudyStep()

base_name = tmp.name
tmp.__dict__ = apply_function(self.__dict__, combo.apply)
# Return if the new step is modified and the step itself.
tmp.base_name = base_name # Why doesn't this work here?
tmp._param_vals = combo.param_vals
tmp._param_labels = combo.param_labels

return self.__ne__(tmp), tmp

Expand Down Expand Up @@ -122,6 +131,20 @@ def __ne__(self, other):
"""
return not self.__eq__(other)

@property
def param_vals(self):
"""
Return dict of parameter values for this step
"""
return self._param_vals

@property
def param_labels(self):
"""
Return dict of parameter labels for this step
"""
return self._param_labels


class Study(DAG, PickleInterface):
"""
Expand Down Expand Up @@ -164,7 +187,8 @@ class Study(DAG, PickleInterface):
"""

def __init__(self, name, description,
studyenv=None, parameters=None, steps=None, out_path="./"):
studyenv=None, parameters=None, steps=None, out_path="./",
draw=False):
Comment thread
FrankD412 marked this conversation as resolved.
"""
Study object used to represent the full workflow of a study.

Expand Down Expand Up @@ -200,11 +224,13 @@ def __init__(self, name, description,
self.is_configured = False
self.add_node(SOURCE, None)

# Settings for handling restarts and submission attempts.
self._restart_limit = 0
self._submission_attempts = 0
self._use_tmp = False
self._dry_run = False
# Settings for handling restarts and submission attempts. Just set to
# defaults here.
self._restart_limit = 0 # Number of restarts before fail
self._submission_attempts = 0 # Submit attempts before fail
self._use_tmp = False # tmp dir for script/lock writing.
self._dry_run = False # Enables dry-run (disables submit)
self.draw = False # Set dag vis flag

# Management structures
# The workspace used by each step.
Expand Down Expand Up @@ -391,24 +417,25 @@ def setup_environment(self):

def configure_study(self, submission_attempts=1, restart_limit=1,
throttle=0, use_tmp=False, hash_ws=False,
dry_run=False):
dry_run=False, draw=[]):
"""
Perform initial configuration of a study. \
Perform initial configuration of a study.

The method is used for going through and actually acquiring each \
dependency, substituting variables, sources and labels. \
dependency, substituting variables, sources and labels.

:param submission_attempts: Number of attempted submissions before \
marking a step as failed. \
marking a step as failed.
:param restart_limit: Upper limit on the number of times a step with \
a restart command can be resubmitted before it is considered failed. \
:param throttle: The maximum number of in-progress jobs allowed. [0 \
denotes no cap].\
denotes no cap].
:param use_tmp: Boolean value specifying if the generated \
ExecutionGraph dumps its information into a temporary directory. \
ExecutionGraph dumps its information into a temporary directory.
:param dry_run: Boolean value that toggles dry run to just generate \
study workspaces and scripts without execution or status checking. \
:returns: True if the Study is successfully setup, False otherwise. \
study workspaces and scripts without execution or status checking.
:param draw: List of visualization dot style options [empty = no draw].
:returns: True if the Study is successfully setup, False otherwise.
"""

self._submission_attempts = submission_attempts
Expand All @@ -417,6 +444,7 @@ def configure_study(self, submission_attempts=1, restart_limit=1,
self._use_tmp = use_tmp
self._hash_ws = hash_ws
self._dry_run = dry_run
self.draw = draw

LOGGER.info(
"\n------------------------------------------\n"
Expand All @@ -426,10 +454,11 @@ def configure_study(self, submission_attempts=1, restart_limit=1,
"Use temporary directory = %s\n"
"Hash workspaces = %s\n"
"Dry run enabled = %s\n"
"Graph vis options = %s\n"
"Output path = %s\n"
"------------------------------------------",
submission_attempts, restart_limit, throttle,
use_tmp, hash_ws, dry_run, self._out_path
use_tmp, hash_ws, dry_run, self.draw, self._out_path
)

self.is_configured = True
Expand All @@ -446,7 +475,7 @@ def _stage(self, dag):
# Items to store that should be reset.
LOGGER.info(
"\n==================================================\n"
"Constructing parameter study '%s'\n"
"Constructing study '%s'\n"
"==================================================\n",
self.name
)
Expand Down Expand Up @@ -644,6 +673,7 @@ def _stage(self, dag):
self.step_combos[step].add(combo_str)

modified, step_exp = node.apply_parameters(combo)
step_exp.base_name = step_exp.name
step_exp.name = combo_str

# Substitute workspaces into the combination.
Expand Down
122 changes: 122 additions & 0 deletions maestrowf/datastructures/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

from collections import deque, OrderedDict
import logging
from math import sqrt

from maestrowf.abstracts.graph import Graph

Expand Down Expand Up @@ -248,3 +249,124 @@ def _detect_cycle(self, v, visited, rstack):
rstack.remove(v)
logger.debug("No cycle originating from '%s'", v)
return False

def export_dag_vis(self, dag_basename, draw_opts):
"""
Export hierarchical representation of this study's dag to the list of
formats specified in draw_opts.

:param dag_basename: Basename of output file, in study output path
:param draw_opts: specifies one or more file output formats.
mpl (matplotlib png), mpl-dot (dot layout mpl), dot
(graphviz dot file), graphml (graphml file)

NOTE: must this re-call topological sort for safety?
NOTE: Add optional node annotations/attributes (colors, shape, etc)
NOTE: Add skeleton only format (unexpanded steps)
NOTE: Add partial expansion of dag -> large workflows
NOTE: What about node attributes when here are too many parameters
to enumerate?
"""

logger.debug("Exporting hierarchical representation of dag")

# Put these at the top of file, maybe decorate this function to handle
# the disablement?
try:
import matplotlib.pyplot as plt
import networkx as nx

except ImportError:
logger.exception("Couldn't import graph drawing utilities; "
"disabling graph visualzation.")
return

try:
from networkx import nx_agraph
have_pygv = True

except ImportError:
logger.exception("Error importing pygraphviz: dot "
"layout/output disabled.")

have_pygv = False

dagnx = nx.DiGraph()

nodelist = self.topological_sort()
node_labels = {}
for idx, node in enumerate(nodelist):

if node == '_source':
node_label = 'Study' # Try to get study name instead?
else:
this_step = self.values[node].step
node_label = '{}\n'.format(this_step.base_name)
for var, value in this_step.param_vals.items():
varname = var[2:-1]
node_label += '{}:{}\n'.format(varname, value)

logger.debug("Adding label to node {}: {}".format(node,
node_label))

node_labels[node] = node_label # draw these later
dagnx.add_node(node,
label=node_label)

for node in nodelist:
edges = self.adjacency_table[node]

dagnx.add_edges_from([(node, child) for child in edges])
logger.debug("Node {} has children: {}".format(node, edges))

# Compute node positions for two layouts
# Note: work on something better for sizing/layout than these hacks
# NOTE: check if this longest path computation is expensive
longest_chain = len(nx.algorithms.dag_longest_path(dagnx))
pos_spring = nx.spring_layout(dagnx, k=1/sqrt(longest_chain))

# Convert to pygraphviz agraph for dot layout
if have_pygv:
pos_dot = nx_agraph.pygraphviz_layout(dagnx, prog='dot')
else:
# Fail-safe for matplotlib rendering
pos_dot = pos_spring

for viz_format in draw_opts:

# For matplotlib, have to do extra work to compute image size
if viz_format == "mpl" or viz_format == "mpl-dot":
fig, ax = plt.subplots(figsize=(3*longest_chain,
2*longest_chain))

if viz_format == "mpl" or viz_format == "graphml":
pos = pos_spring
else:
pos = pos_dot

if viz_format == "mpl" or viz_format == "mpl-dot":
# Possible to iteratively compute node size and figure size?
nx.draw_networkx(dagnx,
pos=pos,
ax=ax,
labels=node_labels,
node_size=500)
# May need to render labels separately?
# nx.draw(dagnx, with_labels=False)
# nx.draw_networkx_labels(dagnx,
plt.savefig(dag_basename + '.png', dpi=150)

if viz_format == "dot" and have_pygv:
# Possible to pass networkx/pygraphviz agraph object around
# when imports aren't available?
nx_agraph.write_dot(dagnx, dag_basename + '.dot')

if viz_format == "graphml" or viz_format == "graphml-dot":
# NOTE: find implementation that avoids this copy
graphml_dag = dagnx
# Add positions as node attributes (NEEDS VERIFICATION)
for node, (x, y) in pos.items():
graphml_dag.node[node]['x'] = float(x)
graphml_dag.node[node]['y'] = float(y)

nx.write_graphml(graphml_dag, dag_basename + '.graphml')
Loading