bioKepler 1.2 User Guide

December 2015

Table of Contents

  1. Introduction
  2. Downloading and Installing the bioKepler Suite
  3. Demo Workflows
  4. bioActors
  5. Distributed Data-Parallel Framework
  6. References

1. Introduction

The bioKepler suite facilitates the development of Kepler workflows for the integrated execution of bioinformatics applications in distributed environments. It contains a specialized set of actors for running bioinformatics tools, directors providing distributed data-parallel (DDP) execution on different computational resources, and example workflows demonstrating how to use these actors and directors. This document describes each of these components and how to install the bioKepler suite.

Note: This guide assumes that you are familiar with Kepler actors, directors, parameters, and ports, and know how to build workflows. For more information about these topics, see the Kepler Getting Started Guide and the Kepler User Manual.

This work is supported by NSF ABI Award DBI-1062565 for bioKepler, NIH Award P41 GM103426 for NBCR, and the Gordon and Betty Moore Foundation award to CalIT2 at UCSD for the CAMERA project. We also thank the FutureGrid project for experimental environment support.

To cite bioKepler, please use the following reference:

I. Altintas, J. Wang, D. Crawl, W. Li, “Challenges and approaches for distributed workflow-driven analysis of large-scale biological data”, In the Proceedings of the Workshop on Data analytics in the Cloud at EDBT/ICDT 2012 Conference, DanaC2012, 2012, pp 73-78.

2. Downloading and Installing the bioKepler Suite

The bioKepler suite can be downloaded and installed using the Kepler Module Manager:

  1. In the Kepler application menu, select Tools => Module Manager.
  2. In the Module Manager dialog, select the Available Modules tab.
  3. Select biokepler-1.2 from the list of Available Suites, and then click the right arrow button to move the bioKepler-1.2 suite to the list of Selected Modules.
  4. Click the Apply and Restart buttons to download bioKepler and restart Kepler.

3. Demo Workflows

The bioKepler suite contains over 40 example workflows that demonstrate the bioKepler actors and directors. These demo workflows can be accessed from the actor library in Kepler UI, by expanding the bioKepler node, as shown in Figure 1. To open a demo workflow, double click on the node.

 

Figure 1: The bioKepler demos in the actor library.

Figure 2 shows the demo workflow "blast.xml", which executes blastall for a set of query sequences on a reference database. The bioActor blastall reads two inputs that specify the FASTA query file and pre-formatted reference file, and, after running blastall, produces a single output alignmentFile; this is the name of the result file for alignments. After blastall finishes, the contents of the result file are displayed in a popup window. Section 4 describes bioActors in more detail.

Figure 2: A workflow demonstrating the blastall bioActor.

Most of the bioKepler demo workflows have a set of workflow parameters specifying the data input and output locations. In the blast.xml workflow, these parameters are query, reference, and outputdir, are shown in the top right of the workflow canvas. In the bioKepler demo workflows, the input files are located in $HOME/KeplerData/workflows/module/bioKepler-1.2.0/data/<name> where name is the workflow name. Additionally, most demo workflows write output files to $HOME/KeplerData/workflows/module/bioKepler-1.2.0/data/output, but a few write output files in the input directory. Note that some bioinformatics tools executed by these workflows do not run correctly if the output files already exist; if an error occurs while running a demo workflow, try removing the output data.

4. bioActors

The bioKepler suite provides a set of actors called “bioActors” that execute bioinformatics programs/tools. In general, each bioActor executes a single application or tool, e.g., the blastall bioActor runs blastall, the bowtie bioActor runs bowtie, etc. Note that the bioKepler suite does not include these applications; to use the bioActor for, e.g., blastall, you need to install blastall and place the binary in your $PATH.

Figure 3: bioActors in the Kepler actor library.

bioActors can be accessed from the Kepler UI actor library by expanding Disciplines, Biology as shown in Figure 3.  There are two sets of bioActors: bioKepler and Bio-Linux. The bioActors inside bioKepler are configured with specific inputs, outputs, parameters, and documentation. The Bio-Linux bioActors, however, are mostly “unconfigured” with only a single input and output[1]. These actors represent most of the 500+ biology-related tools that come with the Bio-Linux distribution[2]. Due to this large number of bioActors, it is not possible to provide configured inputs, outputs, etc., for each of these bioActors. Additionally, there are some applications in Bio-Linux that do not have bioActors. However, you can create and configure them by using the Execution Choice configuration dialog as explained in the next section.

The bioActors under either bioKepler or Bio-Linux are grouped into major categories such as Alignment, Assembly, Clustering, RNA-seq, etc. To locate a bioActor, e.g., bwa, you can browse the bioActor tree or directly search from the Kepler search panel.

4.1. Execution Choice Actor

The bioActors in the bioKepler suite are all instances of the Execution Choice actor that is configured to execute bioinformatics tools. In general, the Execution Choice actor consumes one or more input files, executes the application, and produces one or more output files.  The actor itself does not read or write the contents of the files, but reads and writes the names of the files through its input and output ports, and passes these file names to the underlying application.

Execution Choice provides an abstract representation of executing an application that is made executable by using a sub-workflow to actually run the application during workflow execution. A given application or command-line tool may have different methods to execute it, based on different factors such as input data size, or available computational resources. For example, if the query sequences and reference database are small, blastall may be run locally on the same computer running Kepler. However, if the query or reference is large, it is more efficient to use the MapReduce framework [MapReduce] to execute blastall in a cluster or cloud environment.

The Execution Choice actor provides the ability to specify different sub-workflows to execute an application. A set of pre-defined sub-workflows, or templates, have been created that may be used when configuring an Execution Choice actor. The default sub-workflow, called LocalExecution, uses the External Execution actor to execute a command-line tool on the local machine. Each template is a pre-built sub-workflow that may be modified in the Kepler UI, just as one would any other sub-workflow. Additionally, you may create a new sub-workflow to run the application instead of choosing one of the templates.

bioKepler includes the following templates:

  • CrossReduce: Execute the program in parallel on the local computer or a cluster using the Cross and Reduce actor. The template is made based on BLAST CrossReduce sub-workflow.
  • JobSubmission: Copy local input files to a remote cluster; execute the program on the remote cluster and copy result back to local machine using the GenericJobLauncher and other actors.
  • LocalExecution: Execute the program on the local computer using the External Execution actor.
  • MapOnly: Execute the program in parallel on the local computer or a cluster using the Map actor. The template is made based on blastall MapOnly sub-workflow.
  • MapReduce: Execute the program in parallel on the local computer or a cluster using the Map and Reduce actor. The template is made based on blastall MapReduce sub-workflow.
  • RESTService: Execute a REST service using the RestService actor, based on http://www.biomart.org/biomart/martservice.
  • WebService: Execute a Web service using the WebService actor, based on http://npd.hgu.mrc.ac.uk/soap/npd.wsdl

An Execution Choice actor has zero or more Inputs and Outputs, each of which is either a Data or File type. For a Data Input or Output, the actor has an input or output port, respectively, and reads or writes tokens through these ports like any other actor. Additionally, each sub-workflow has the corresponding input and output ports, and these ports behave the same as ports inside composite actors, i.e., tokens consumed by an outside port are available to the port inside the sub-workflow, and tokens written to a port inside the sub-workflow are produced by the corresponding outside port. Data Inputs and Outputs are identical to input and output ports for composite actors, and can be used to transfer any kind of tokens.

File Inputs and Outputs, however, transfer string tokens containing file names. A File Input or Output has both an input or output port, respectively, and a parameter with the same name. For Inputs, if the input port is connected, then Execution Choice reads the file name from the connected actor; otherwise the parameter value is written to the port inside the sub-workflow. (This is the same behavior as PortParameters.) As with Data Inputs, each File Input has a corresponding input port inside each sub-workflow.

Data Outputs have both a parameter and output port. However, unlike Data Inputs, the value of the parameter, i.e., the name of an output file, is always written to the output port; this occurs when the sub-workflow finishes executing. Additionally, for Data Outputs there are no corresponding ports inside the sub-workflow. (The inside ports are not shown since they cannot be used.)

Since an Execution Choice actor should work with any of its sub-workflows, its input/output ports are the same in all sub-workflows. If a port inside of a sub-workflow is not connected, its data will not be used in that sub-workflow.

We recommend using File Inputs/Outputs instead of Data Inputs/Outputs for bioinformatics tools, especially for large data sets, so that there will be less data token transfer overhead in Kepler.

Figure 4: Configuration dialog for the Execution Choice actor.

An Execution Choice actor can be configured by double-clicking on the actor on the workflow canvas. Figure 4 shows the configuration dialog for the blastall bioActor. The dialog is organized into different tabs, one for each sub-workflow, and a Shared Options tab for the overall configuration. As shown in the figure, blastall has sub-workflows for named CrossReduce, JobSubmission, and LocalExecution. (There are additional sub-workflows not shown for MapReduce, and MapOnly.)

The Shared Options tab configures how to run the application, the set of Input File Parameters, Output File Parameters, Parameters, and which sub-workflow to execute. The program parameter specifies the name of the application or command-line tool, and unless you are customizing Execution Choice to run a new tool, i.e., creating a new bioActor, this parameter should not be changed. The Input and Output File Parameters sections display the parameters for File Inputs and Outputs.

File Output Parameters also includes a boolean parameter checkOutputTimestamp, which is used to verify the sub-workflow execution actually produced output. If checkOutputTimestamp is true, Execution Choice checks the modification timestamp of each output file before and after the application executes. If the timestamp has not increased, then Execution Choice throws an error.

The Parameters section lists the actor parameters, which may be used to specify command-line arguments. As shown in Figure 4, the parameter eValue is for the “-e” argument and has a default value of 1E-5. Similar to other actor parameters, parameters defined here may reference other parameters defined in the same actor or defined in a scope above the Execution Choice actor. Parameters defined here may also be referenced in sub-workflows. All Execution Choice actors have an additionalOptions parameter, which can be used to specify command-line arguments without creating new parameters.

Figure 5: MapOnly tab in ExecutionChoice actor.

Figure 6: MapOnly sub-workflow in ExecutionChoice actor.

The tabs for the sub-workflows display parameters for that sub-workflow. Figure 5 shows the tab for MapOnly and Figure 6 shows the MapOnly sub-workflow. As seen in the figures, the parameters in the tab are defined at the top level of the sub-workflow.

All sub-workflows contain a special commandLine parameter. This parameter represents the full command-line to use when executing the sub-workflow. For example, in the LocalExecution sub-workflow for blastall, commandLine is the program name followed by each of the parameters with their arguments, the input file names, and finally the output file name: “blastall -p blastn -m 8 –e 1E-5 –d Ocean_Alaska.fa –i testQuery.fa > blast.out”. The commandLine parameters in each sub-workflow are automatically updated whenever the File Inputs, File Outputs, or parameters are added, removed, renamed, or the argument is changed. Ideally, you will not have to modify commandLine and the Execution Choice actor constructs it automatically from these components. However, you may modify commandLine like any other parameter. Also, the arguments of commandLine for different sub-workflow are often different because each sub-workflow may have a specific method of organizing the arguments.

Note: The configuration dialog for the Execution Choice actor is different than the configuration dialog for other actors. Besides parameter configuration, this dialog should also be used to modify ports for Execution Choice actors instead of using the ‘Configure Port’ dialog normally used for other actors.

The buttons at the bottom of the Execution Choice configuration dialog display different dialogs, as described below:

Figure 7: The Add Dialog.

The Add button is used to add components to the Execution Choice actor. It displays the Add Dialog shown in Figure 7. The user first chooses the type of component to add. If the type is a File Input, File Output, or parameter, a command-line argument can be specified, and this argument is placed in the commandLine parameter when using the input or output file name, or parameter value. All types of components must be given a unique name before they can be added.

For Input or Output components, the data type can either be File or Data. If the type is File, then the tokens are assumed to be file names and only ports for Inputs are created inside the sub-workflows as described above. If the type is Data, input and output ports are created inside the sub-workflows and the tokens can contain anything.

A parameter can either be numeric or a string. In the latter case quotes are not needed.

Sub-workflows from the template library can also be added from this dialog. If the Execution Choice button is selected, you can choose the template from the Choice combobox. To add a sub-workflow from a MoML XML file not in the template library, use the Import button instead of Add.

The Arguments button displays the dialog shown in Figure 8. This dialog allows you to change the value of an Input, Output, or parameter's argument.

Figure 8: The Arguments Dialog.

Figure 9: The Export Dialog.

The Cancel button reverts any changes to parameter values and closes the configuration dialog. The Commit button closes the configuration dialog.

The Export button lets you to save a sub-workflow as a MoML XML file. You can then add the file to the template library, or share with other workflow developers. Clicking on the Export button shows the dialog in Figure 9, in which you choose the sub-workflow to export; clicking OK shows a File Save dialog to choose the name of the XML file.

A sub-workflow may also be loaded from a MoML XML file not in the template library. Clicking on the Import button shows a file dialog where you can select the file to load. Next, another dialog lets you choose the name of the sub-workflow.

Figure 10: The Remove Dialog.

Inputs, Outputs, parameters, or sub-workflows can be removed by clicking on the Remove button. In the Remove Dialog, shown in Figure 10, first specify the type of component to remove, and then the component's name. If parameter is selected, you can choose whether the parameter to remove is defined globally, i.e., displayed in Shared Options, or one of the sub-workflows.

 

Figure 11: The Rename Dialog.

Click on the Rename button to show the dialog in Figure 11. This dialog allows you to rename Inputs, Outputs, parameters, or sub-workflows. Each of these components can be renamed in the Shared Options tab, and parameters defined in sub-workflows may be renamed by clicking on the sub-workflow tabs.

Figure 12: The Data Types Dialog.

Clicking the Types button shows the dialog in Figure 12. This dialog lets you change the types for File Inputs or Outputs (either File or Data), or change parameter types (numeric or string).

5. Distributed Data-Parallel Framework

The bioKepler suite includes the Kepler Distributed Data-Parallel (DDP) framework for data-parallel execution on distributed computational resources. Many DDP patterns facilitate data-intensive applications/workflows, which can execute in parallel with partitioned data on distributed computing nodes. The advantages of these patterns include: (i) support data distribution and parallel data processing with distributed data on multiple nodes/cores; (ii) provide a higher-level programming model to facilitate program parallelization; (iii) follow a “moving computation to data” principle that reduces data movement overheads; (iv) have good scalability and performance acceleration when executing on distributed resources; (v) support run-time features such as fault-tolerance; and (vi) simplify the difficulty for parallel programming in comparison to traditional parallel programming interfaces such as MPI and OpenMP.

Figure 13: A DDP MapReduce workflow to execute blastall.

Figure 13 shows a DDP workflow that uses MapReduce to execute blastall. This workflow contains composite actors for Map and Reduce patterns, atomic actors for data partitioning and merging, and a DDP Director to execute the workflow on a Big Data execution engine such as Hadoop. The rest of this section describes these types of components in more detail.

Note: The actors and director described in this section are meant to be used together: other actors will not work with the DDP Director. To use non-DDP and DDP actors in the same workflow, place the DDP actors in a sub-workflow.

5.1. Pattern Actors

The DDP framework includes a set of special composite actors, called DDP pattern actors, that provide an interface to data-parallel patterns. Each DDP pattern actor corresponds to a particular data-parallel pattern, and actors have been implemented for Map, Reduce, CoGroup, Match, and Cross.  The semantics of these patterns are defined in Table 1, and more information can be found in [PACT].

Map

  • Independently processes each <key, value> pair.
  • The user-defined function can be executed in parallel for the input keys.

Reduce

  • Partitions the <key, value> pairs based on their keys.
  • All pairs with the same key are processed together in one instance of the user-defined function.
  • Executions for different keys can be done in parallel.

CoGroup

  • Partitions the <key, value> pairs of the two input sets based on their keys.
  • For each input, all values with the same key form one subset.
  • Each user-defined function instance gets three inputs comprised of a key and two value subsets for the key.
  • Executions for different keys can be done in parallel.
  • If a key is only at one input set, the value subset for this key from the other input will be empty.

Match

  • Partitions the <key, value> pairs of the two input sets based on their keys.
  • Value pairs from the Cartesian product of the two value subsets for the same key are processed together.
  • Each value pair from the product and the key will be processed by an instance of the user-defined function.
  • Executions for different inputs can be done in parallel.
  • If a key is only at one input set, no user-defined function will be instantiated for this key.

Cross

  • Processes all elements from the Cartesian product of the <key, value> pairs of the two input sets.
  • Each instance of the user-defined function processes one element from the Cartesian product and its inputs are two <key, value> pairs from the two input sets respectively.
  • Executions for different inputs can be done in parallel.

Table 1: A description of DDP patterns.

Similar to other actors, the DDP actors can be linked together to form a chain of tasks, and can be nested hierarchically as part of larger workflows. Each DDP actor corresponds to a single pattern thereby allowing greater flexibility to express workflow logic. For example, one or more DDP actors can be dependent on one upstream Map actor.

A DDP pattern actor provides two important pieces of information: the data-parallel pattern, and the tasks to perform once the pattern has been applied. The type of data-parallel pattern specifies how the data is grouped among computational resources as described in Table 1. For example, Reduce combines all keys with the same value into a single group, but Map performs no grouping and processes each key independently.

Once the data is grouped, a set of processing steps is applied to the data. DDP pattern actors provide two mechanisms specifying these steps: choose a predefined function or create a sub-workflow. In the former case, you implement the processing as a Java class and specify the executionClass parameter.

Figure 14: Sub-workflow for Map.

Figure 15: Sub-workflow for Reduce.

DDP pattern actors can also use a sub-workflow to specify the data processing tasks. In this case, you build a sub-workflow in Kepler using actors and directors. Figure 14 and Figure 15 show the sub-workflows for Map and Reduce, respectively. As with other Kepler sub-workflows, these contain actors linked together to perform tasks. The Map sub-workflow runs the command-line tools formatdb and blastall using External Execution actors, and the Reduce sub-workflow uses the BlastTabularResultMerge actor to merge the outputs from blastall.

Note: A director is required in the sub-workflow of each DDP pattern composite actor.  The DDP execution engine runs these sub-workflows as separate, top-level workflows, so the execution semantics of each sub-workflow need to be specified by choosing a director.

The DDP pattern actors have the following parameters:

  • degreeOfParallelism: The number of instances of this pattern the execution engine should run in parallel.
  • displayRedirectDir: The directory where the display related actors in DDP pattern sub-workflows will save their outputs. If it is not specified, any display actors in the sub-workflow will be removed before execution.
  • executionClass: The fully-qualified name of the Java class implementing the user-defined function. If this parameter is specified, the sub-workflow is ignored; otherwise the sub-workflow is used as the user-defined function. To use a class not included with Kepler, add the class's jar to the jars parameter.
  • inKeyValueTypes: If the executionClass is used, this specifies the data types for the input keys and values.
  • jars: A comma-separated list of jar files to include when executing this pattern. The full path to each jar must be specified. The jars may contain the class specified in the executionClass parameter, or any classes needed by actors in the sub-workflow.
  • outKeyValueTypes: If the executionClass is used, this specifies the data types for the output keys and values.
  • printExeSummary: If true, print a line containing runtime information to the log file each time the sub-workflow executes. Note: this can significantly slow down execution and should only be used for debugging.
  • runWorkflowLifecyclePerInput: If true, the full lifecycle (initialization and wrapup) of the sub-workflow will be executed for each input data. By setting this to true, multiple outputs can be produced for each input. If false, a single iteration of the workflow is performed for each input and only a single output may be generated. Performance is much faster if this is set to false.

The Map actor also has the following parameter:

  • partitionerClass: The name of the PartitionerClass used only by Hadoop for partitioning Map inputs.

The Reduce actor also has the following parameters:

  • combineExecutionClass: The fully-qualified name of the Java class implementing a Reduce function to pre-reduce the data after a Map.
  • groupingComparatorClass: The name of the GroupingComparatorClass, used only by Hadoop for grouping input data of reducers.
  • useAsCombiner:  If true, use the sub-workflow or execution class to pre-reduce the data after a Map.

5.2. I/O Actors

The DDP framework also includes I/O actors for reading and writing data between the data-parallel patterns and the underlying storage system. The DDPDataSource actor specifies the location of the input data in the storage system as well as how to partition the data. Similarly, the DDPDataSink actor specifies the data output location and how to merge the data. The partitioning and merging methods depend on the format of the data and are application-specific.

The DDP I/O actors have the following parameters:

  • degreeOfParallelism: The number of instances to execute in parallel.
  • formatType: The name of the data format or a fully-qualified Java class to parse the data.
  • jars: A comma-separated list of jars to use when executing this actor. The full path to each jar must be specified.
  • keyValueTypes: The key value types of the atomic patter actor. This parameter should only be set when a class name is specified for formatType parameter.

DDPDataSource also has the following parameters:

  • chunkSize: If greater than one, then the input data array is split into arrays whose size is at most this value. This parameter is only used when the formatType is TokenInputFormat.

DDPDataSink also has the following parameters:

  • mergeMultiPartOutputs: If true, merge multiple parts of an output file into a single file.

An I/O actor may have additional parameters based on the format type. Table 2 describes the parameters used for each format type. When the formatType parameter is changed in the configuration dialog, the dialog must be closed and re-opened to see the parameters associated with the newly selected type.

Format Name & Functionality

Types

Parameters

Key

Value

 

FASTAInputFormat: Input Format to split FASTA format datasets.

string

string

SequenceNumberPerExecution: The maximal sequence number process for each sub-workflow execution, default = 5000.

FASTAInputFormatBySize: Input Format to split FASTA format datasets by size.

string

string

SequenceSizePerExecution: The maximal sequence size (unit: MB) to be processed for each sub-workflow execution, default = 1.

DelimitedTextInputFormat: Input Format to split text input files using delimiter.

long

string

Delimiter: The delimiter string to split the data, default = “\n”.

FilePathInputFormat: Input Format to read file paths. Key is file name (no path info), value is file path.

string

string

N/A

LineInputFormat: Input Format to split text input files.

long

string

N/A

LineOutputFormat: Output Format to write output key-value pairs: each pair is a separate line; key and value are separated by ‘\t’.

string

string

N/A

NullOutputFormat: Output Format that discards the data.

string

string

N/A

TokenInputFormat: Input Format to read an array of tokens.

nil

same type as array element

N/A

TokenOutputFormat: Output Format to create an array of tokens.

same type as array key  

same type as array value

N/A

ValueOnlyOutputFormat: Output Format to write only values: each value is a separate line.

string

string

N/A

Table 2: A description of I/O format types.

The format types are implemented as Java classes that are specific to the underlying execution engine. The classes for the format types in Table 2 are contained in the jars included in the bioKepler suite.

The Token Formats transfer tokens between actors in the workflow and actors in the DDP Pattern sub-workflows. These formats read and write arrays of tokens instead of using a file path: DDPDataSource reads an array of tokens and splits the values based on the degree of parallelism, and DDPDataSink combines the tokens to output a token array.

New format types can be created by implementing the execution engine interfaces for data partitioning or merging. The format type class must be inside a jar on Kepler's class path, e.g., in one of the module's lib/jar/ directory, or the jar must be specified in the jars parameter. Additionally, the configuration.xml in the Kepler module for the execution engine must be updated with the format name, Java class, and any additional parameters. For examples, see the Hadoop format types in kepler.modules/hadoop-1.2/resources/configuration/configuration.xml.

5.3. Director and Engines

The bioKepler suite contains DDPDirector[3], which executes workflows composed of DDP patterns and I/O actors on a Big Data engine. Three engines are supported in bioKepler 1.2: Hadoop (version 2.2.0)[4], Stratosphere (version 0.4) [5], and Spark (version 1.5.0)[6]. For more details about how the DDP director executes workflows on Hadoop and Stratosphere see [ICCS]. (The execution for Spark is similar.) If more than one execution engine is available, you can achieve flexible DDP workflow execution by simply switching the engine within the DDP sub-workflow.

We have made a significant effort to ensure that DDP workflows execute the same on all three engines. However, since each platform provides a different execution framework, configuration options, and programming API, there are some differences that are described in Table 3. The categories for Table 3 are:

  • Patterns for Workflows: The execution is specified by workflow.
  • Patterns for Classes in Kepler: The execution is specified by a class name in a DDP Pattern actor’s executionClass parameter.
  • Patterns for External Jars: The execution is specified by a class contained in a jar, and the name of the jar is specified by a DDP Pattern actor’s jars parameter or DDPDirector’s includeJars parameter.
  • Display Redirect in Pattern Workflow: UI actors such as Display and the plotting actors can be placed inside a DDP Pattern workflow and instead of displaying to a window, the display will be written to a file. The DDPDirector’s displayRedirectDir parameter specifies the directory in which to write these files.
  • Distributed Execution: Running workflows on a cluster of distributed nodes. To enable it, the startServerType parameter of DDPDirector should be distributed.
  • Parallel Execution: Parallel execution of DDP workflow by setting the degreeOfParallelism parameter of DDPDirector.
  • HDFS for Input/Output:  supports for processing data at HDFS[7].
  • InputFormats: The input formats supported by DDPDataSource (described in Table 2).
  • OutputFormats: The output formats supported by DDPDataSink  (described in Table 2).

Table 3: DDP Engine Supported Features

The DDPDirector performs tasks on the DDP execution engine whose configuration directory is specified by the configDir parameter. The format of the configuration files in this directory is dependent on the execution engine and usually contains information including:

  • host name and port of the master node or server
  • number and types of slave nodes
  • HDFS server name and port

Different configuration files may be used to run the execution engine in cloud, cluster, or single node environments. Configuration information can be found at:

During workflow initialization, the DDPDirector tries to connect to execution engine server as specified in configDir.  If the director successfully connects, it will submit a job based on the DDP actors. However, if DDPDirector fails to contact the server, the director will start the server on the local computer. bioKepler includes configuration files, libraries, and executables to run Hadoop, and Stratosphere, in local execution mode. These files are located in:

  • $HOME/KeplerData/workflows/modules/hadoop-1.2/workflows/data
  • $HOME/KeplerData/workflows/modules/stratosphere-1.2/workflows/data

bioKepler 1.2 does not include the assembly jar required to run the standalone Spark server since it is very large. You can build the assembly jar by following the instructions here.

Note: The Hadoop, Stratosphere, and Spark scripts work on Mac and Linux. If you want to run one of these platforms as a standalone server on Windows, you can try Cygwin although we have not extensively tested this configuration.

The DDPDirector contain the following parameters used for all engines:

  • configDir: The directory containing the configuration for the execution engine.
  • degreeOfParallelism: The default number of instances of each DDP actor. This value is used if the actor’s degreeOfParallelism is less than or equal to zero.
  • displayRedirectDir: The directory where the display related actors in DDP pattern sub-workflows will save their outputs. If it is not specified, any display actors in the sub-workflow will be removed before execution.
  • includeJars: A comma-separated list of jars to be included for the job. For example, when we run DDP actors by specifying executionClass instead of a sub-workflow, this parameter specifies the jar containing the execution class. If the absolute path is not specified for a jar, then it is assumed to be in a Kepler module.
  • jobArguments: A comma-separated list of arguments for the job, e.g.,  org.kepler.hadoop.lineread.number=500, mapreduce.task.timeout=6000.
  • startServerType: The type of DDP engine to use. If sameJVM is selected, then the engine will run in the same JVM as Kepler. If distributed is selected, the DDPDirector will try to connect to a standalone server, and, if one is not running, the director will start one. More information on how to start distributed DDP engine can be found at https://kepler-project.org/developers/interest-groups/distributed/configuring-hadoop-for-biokepler-or-ddp-suite.
  • writeSubWorkflowsToFiles: If true, the MoML XML for each sub-workflow is written to a file in $HOME/nnnn, where nnnn is a randomly generated number printed to stdout when the workflow executes.

The Hadoop engine also has the following parameters:

  • autoHDFSStage: If true, the input/output data at local file system will be automatically staged into/out HDFS.
  • overwriteOutput: If true, the output directory is deleted before the Hadoop job is submitted.
  • removeTmpHDFSDir: If true, the temporary directory created in HDFS during execution will be removed after workflow execution.

The Spark engine also has the following parameters:

  • masterHostAndPort: A URL of the form spark://host:port that specifies a Spark standalone server host and port.
  • numSameJVMWorkers: When Spark runs in the same JVM as Kepler, this value specifies the number of worker threads to run. The degreeOfParallelism for DDP actors should be less than or equal to this value.

5.4. Best Practices

This section summarizes the best practices we have for the degreeOfParallelism parameter for different DDP patterns. The most up to date version can be found here.

A good configuration rule for Map, Reduce, CoGroup, and Match is to set the degreeOfParallelism values of all the DDPDataSource, DDPDataSink, and DDP Pattern actors to be the targeted parallel number.

For the DDP Cross Pattern, we recommend setting its degreeOfParallelism to product of the two degreeOfParallelism values of its inputs. The degreeOfParallelism value of DDPDataSink should be 0.

Some DDP engines have different parallel execution behavior when executing in the same JVM as Kepler. Hadoop will only use a single thread and degreeOfParallelism is ignored. Spark must be configured with the number of workers in the numSameJVMWorkers parameter. The degreeOfParallelism should be less than or equal to this value.

6. References

[MapReduce] J. Dean, S. Ghemawat, Mapreduce: Simplified data processing on large clusters, Communications of the ACM 51 (1) (2008) 107–113.

[PACT] D. Battre S. Ewen, F. Hueske, O. Kao, V. Markl, D. Warneke, Nephele/PACTs: A programming model and execution framework for web-scale analytical processing. Proceedings of the 1st ACM symposium on Cloud computing, SoCC '10, ACM, New York, NY, USA, 2010, pp. 119-130.

[ICCS] Jianwu Wang, Daniel Crawl, Ilkay Altintas. A Framework for Distributed Data-Parallel Execution in the Kepler Scientific Workflow System. Workshop on Advances in the Kepler Scientific Workflow System and Its Applications at ICCS 2012, pp. 1620-1629.


[1] There are several bioActors in both bioKepler and Bio-Linux that are fully configured.

[2] Bio-Linux distribution: http://environmentalomics.org/bio-linux/

[3] bioKepler 1.0 included StratosphereDirector and HadoopDirector. These directors are also included in bioKepler 1.2 for backwards-compatibility, but are deprecated in favor of DDPDirector.

[5] Stratosphere: http://stratosphere.eu

[7] Hadoop Distributed File System (HDFS): http://wiki.apache.org/hadoop/HDFS