2.3.10 Pipelines

A pipeline is a container for a series of data processing steps. The pipeline object allows you to get an existing pipeline or create a new one with a given name. With this object, you can set input data sources, add transforms, feature generators, feature selectors, feature transforms and classifiers. An example pipeline is shown below.

Examples:

client.pipeline = 'my_pipeline'

client.pipeline.set_input_query('ExampleQuery')

client.pipeline.add_transform('Windowing')

# Feature Generation
client.pipeline.add_feature_generator(
    [
        {'subtype_call':'Time', 'params':{'sample_rate':100}},
        {'subtype_call':'Rate of Change'},
        {'subtype_call':'Statistical'},
        {'subtype_call':'Energy'},
        {'subtype_call':'Amplitude', 'params':{'smoothing_factor':9}}
    ],
        function_defaults={'columns':sensor_columns},
    )

# Scale to 8 bit representation for classification
client.pipeline.add_transform('Min Max Scale')

# Perform Feature Selection to remove highly correlated features, features
 with high variance and finally recursive feature eliminate.
# (Note: Recursive feature elimination can be very slow for large data sets and large number of parameters,
#        it is recommended to use other feature selection algorithms to first reduce the number of features)

client.pipeline.add_feature_selector(
    [
        {"name": "Correlation Threshold","params":{'threshold':0.85}},
        {"name": "Variance Threshold",
                            "params":{
                                'threshold':0.05
                                    }
        },
        {"name":"Recursive Feature Elimination",
                            "params":{
                                    "method":"Log R",
                                    "number_of_features":20
                                    }
        },
    ],
        params = {"number_of_features":20}
    )

client.pipeline.set_validation_method('Stratified K-Fold Cross-Validation',
                                        params={'number_of_folds':3})

client.pipeline.set_classifier('PME',
            params={'classification_mode':'RBF',
                    'distance_mode':'L1'
                    })

client.pipeline.set_training_algorithm('Hierarchical Clustering with Neuron Optimization',
                    params = {'number_of_neurons':10})

client.pipeline.set_tvo({'validation_seed':0})

results, stats = client.pipeline.execute()

The final step is where the pipeline is sent to the Cloud Engine for execution. When the job is completed, the results are returned to you as a model object if classification has occurred or a DataFrame if you are at an intermediary step.

client.pipeline.execute()

class mplabml.pipeline. Pipeline(kb, name=None)

Base class of a pipeline object

add_augmentation(augmentation, params={})

Add augmentation set to the pipeline.

Parameters
  • augmentation (List) – List of dictionaries containing time series augmentations

  • params (dict , {}) – Parameters of the feature selector set.

Examples

>>> client.pipeline.add_augmentation([{"name": "Add Noise", "params": {"scale": [0.05, 0.1, 0.15 ],  # , 0.1, 0.15
>>>                                                                 "target_labels": ['idle','walking'],
>>>                                                                 "target_sensors": ['AccelerometerX', 'GyroscopeX']
>>>                                                                }}
>>>                                   ])
add_feature_generator(feature_generators, params={}, function_defaults={}, return_generator_set=False)

Add a feature generator set to the pipeline.

Parameters
  • feature_generators (list) – List of feature generators. As names or dictionaries.

  • params (dict , {}}) – Parameters to apply to the feature generator set

  • function_defaults (dict , {}}) – Parameters to apply to all individual feature generators

Examples

>>> # Add a single feature generator
>>> client.pipeline.add_feature_generator([{'name': 'Downsample', 'params': {'new_length': 5}}, {'name': 'Mean'}],
                                       function_defaults = {'columns': sensor_columns})
>>> # Call multiple functions by name when they use the same same function defaults
>>> client.pipeline.add_feature_generator(['Mean', 'Standard Deviation', 'Skewness', 'Kurtosis', '25th Percentile',
                                       '75th Percentile', '100th Percentile', 'Zero Crossing Rate'],
                                       function_defaults = {'columns': sensor_columns})
>>> # Call multiple functions using function defaults
>>> client.pipeline.add_feature_generator([{'name': 'Downsample', 'params': {'new_length': 5}}, {'name': 'Mean'}],
                                        function_defaults = {'columns': sensor_columns})
>>> # Call multiple functions by subtype which use different parameters; note all subtypes will take the same inputs
>>> client.pipeline.add_feature_generator([{'subtype_call': 'Area', 'params': {'sample_rate': 100, 'smoothing_factor': 9}},
                                        {'subtype_call': 'Time', 'params': {'sample_rate': 100}},
                                        {'subtype_call': 'Rate of Change'},
                                        {'subtype_call': 'Statistical'},
                                        {'subtype_call': 'Energy'},
                                        {'subtype_call': 'Amplitude', 'params': {'smoothing_factor': 9}},
                                        {'subtype_call': 'Physical', 'params': {'sample_rate': 100}}
                                        ],
                                        function_defaults={'columns': sensor_columns},
                                       )
>>> # Mix subtype and specify additional feature generators
>>> client.pipeline.add_feature_generator([{'subtype_call': 'Statistical'},
                                        {'name': 'Downsample', 'params': {'new_length': 5}},
                                        ],
                                        function_defaults={'columns': sensor_columns},
                                        )
>>> # Call the same feature generators multiple times with different parameters
>>> client.pipeline.add_feature_generator([{'name': 'Downsample', 'params': {'new_length': 5, 'columns': sensor_columns[0]}},
                                        {'name': 'Downsample', 'params': {'new_length': 12}},
                                        ],
                                        function_defaults={'columns': sensor_columns},
                                        )
add_feature_selector(feature_selectors, params={})

Add a feature selection set to the pipeline.

Parameters
  • feature_selectors (List) – List of dictionaries containing feature selectors

  • params (dict , {}) – Parameters of the feature selector set

Examples

>>> client.pipeline.add_feature_selector([{"name":"Recursive Feature Elimination", "params":{"method":"Log R"}}],
>>>                                    params = {"number_of_features":20})
add_linear_step(func)

Add a step to the pipeline. Automatically tie the previous step and current step.

Parameters

func (function) – A mplabml function method call

add_segmenter(name, params={})

Add a Segmenter to the pipeline.

Parameters
  • name (str) – Name of the segmenter method to add

  • params (dict , optional) – Dictionary containing the parameters of the transform

add_transform(name, params={}, overwrite=True)

Add a Transform to the pipeline.

Parameters
  • name (str) – Name of the transform method to add

  • params (dict , optional) – Dictionary containing the parameters of the transform

  • overwrite (boolean) – When adding multiple instances of the same transform, set overwrite to False for the additional steps and the first instance will not be overwritten. (default is True)

auto(auto_params, run_parallel=True, lock=True, silent=True, renderer=None, version=2)

Execute automated pipeline asynchronously

The automated pipeline finds optimal parameters with a genetic algorithm. The genetic algorithm starts with a user-defined randomized population (pipelines) and generates models from them and tests them, keeps a subset of high-performing combinations, then recombines those survivors and repeats the process over again. The offspring of good parameter combinations are usually also good and sometimes are significantly better than their parents. As the algorithm iterates each successive generation, it often finds a near-optimal model without trying as many configurations. The algorithm terminates when the desired number of iterations are complete.

The created pipelines are run in parallel in MPLABML servers and results are ranked by the fitness score, which takes into account the model’s F1 score, precision, sensitivity and other metrics. The automation options and definition of the performance metrics are given below.

Parameters
  • params (dict) – Parameters of the genetic algorithm to optimize the pipelines. Definition and options of the parameters are given below. - search_steps, (list, [‘selectorset’, ‘tvo’]): it is used to define which libraries in the pipelines will be optimized.
    • population_size, (int, 10): Initial number of randomly created pipelines

    • iterations, (int, 1): Repetition number of optimization process

    • mutation_rate, (float, 0.1): Random changes from the previous population

    • recreation_rate, (float, 0.1): Rate of randomly created pipelines for next generation

    • survivor_rate, (float, 0.5): Ratio of the population that will be transferred to next generation

    • number_of_models_to_return, (int, 5): Number of pipelines that will return to user

    • allow_unknown, (bool, False): Allows creating unknown prediction results for the vectors. A vector is classified as an unknown if it cannot be recognized by any neurons.

    • fitness (dict): Fitness parameters are combination of statistical and cost variables. It is used to evaluate the performance of the models found by the algorithm. The user will define the coefficient scores for the parameters to define the priority of the parameters in the fitness score. Definition and options of the parameters are given below.

      • statistical variables:
        • accuracy: The degree of correctness of all vectors

        • f1_score: Measures of the test’s accuracy

        • precision: Proportion of positive identifications that is actually correct

        • sensitivity: Measures of the proportion of actual positives that are correctly identified

        • specificity: Measures of the proportion of actual negatives that are correctly identified

        • positive_predictive_rate: Ratio of true positive is the event that the test makes a positive prediction

      • cost variables:
        • neurons: Number of neurons that used in the model

        • features: Number of features that used in the model

  • lock (bool , False) – Ping for results every 30 seconds until the process finishes

  • silent (bool , True) – Silence status updates

Example

activity_data = client.datasets.load_activity_raw()
client.upload_dataframe('activity_data.csv', activity_data, force=True)

client.pipeline.reset()
client.pipeline.set_input_data('activity_data.csv',
                            data_columns = ['accelx', 'accely', 'accelz', 'gyrox', 'gyroy', 'gyroz'],
                            group_columns = ['Subject','Class', 'Rep'],
                            label_column = 'Class')

client.pipeline.add_transform("Windowing", params={"window_size": 100, "delta": 100 })

results, summary = client.pipeline.auto({ 'params':{"search_steps": ['selectorset', 'tvo'],
                                                "population_size": 10,
                                                "iterations": 1,
                                                "mutation_rate": 0.1,
                                                "recreation_rate": 0.1,
                                                "survivor_rate": 0.5,
                                                "number_of_models_to_return": 5,
                                                "run_parallel": True,
                                                "allow_unknown": False,
                                                "fitness": {'accuracy': 1.0,
                                                            'f1_score': 0.0,
                                                            'features': 0.3,
                                                            'precision': 0.0,
                                                            'sensitivity': 0.7,
                                                            'specificity': 0.0,
                                                            'positive_predictive_rate': 0.0},

                                                }})

results.summarize()
autosegment_search(params, run_parallel=True, lock=True, silent=True)

Execute auto segment search pipeline asynchronously.

Parameters
  • params (dict) – Automation parameters for segment search

  • run_parallel (bool , True) – Run in parallel in KB cloud

  • lock (bool , False) – Ping for results every 30 seconds until the process finishes

  • silent (bool , True) – Silence status updates

clear_cache()

Deletes the cache on KB cloud for this pipeline.

data(pipeline_step, page_index=0)

Retrieves results from a specific pipeline step in the pipeline from stored values in kbcloud after execution is performed.

Parameters
  • pipeline_step (int) – Pipeline step to retrieve results from

  • page_index (int) – Index of data to get

Returns

A ModelResultSet if the selected pipeline step is TVO step; otherwise, the output of the pipeline step is returned as a DataFrame.

delete_sandbox()

Clears the local pipeline steps, and deletes the sandbox from the KB cloud.

describe(show_params=True, show_set_params=False)

Prints out a description of the pipeline steps and parameters

Parameters

show_params (bool , True) – Include the parameters in the pipeline description

execute(wait_time=15, silent=True, describe=True, **kwargs)

Execute pipeline asynchronously and check for results

Parameters
  • wait_time (int , 10) – Time to wait in between status checks

  • silent (bool , True) – Silence status updates

features_to_tensor(feature_vectors, validate=0.1, test=0.1, label=None, feature_columns=None, class_map=None, dtype=<class 'numpy.int32'>, int8_input_type=True, shape=None, verbose=True)

Converts a MPLABML feature vector DataFrame into a train, test, validate set of Numpy arrays to use in training a tensorflow model

Parameters
  • feature_vectors (DataFrame) – DataFrame of feature vectors returned from MPLABML Pipeline

  • validate (float , optional) – Percentage of data to use in validation set. Defaults to 0.1.

  • test (float , optional) – Percentage of data to use in test set. Defaults to 0.1.

  • label (str , optional) – The label column for the feature vectors. Defaults to None.

  • feature_columns (list , optional) – The columns of the feature vector DataFrame containing features. Defaults to None.

  • class_map (dict , optional) – A class map describing the mapping of labels to int values. Defaults to None.

  • dtype (_type_ , optional) – The dtype of the tensorflow object to create. Defaults to np.int32.

  • int8_input_type (bool , optional) – Scales the MPLABML Features to int8 from uint8. Defaults to True.

  • shape (tuple , optional) – Reshape the DataFrame before loading it into the numpy array. Defaults to None.

Returns

A tuple containing numpy arrays for traning a TensorFlow model along with the class map x_train, x_test, x_validate, y_train, y_test, y_validate, class_map

get_automl_iteration_metrics()

Results of automl stats for each iteration

Returns

AutoML results stats

Return type

DataFrame

get_function_type(name)
Returns

The type of a function

Return type

str

get_knowledgepack(uuid)

Retrieve knowledgepack by uuid from the server

Parameters

uuid (str) – Unique identifier for knowledgepack

Returns

A knowledgepack object

Return type

TYPE

get_pipeline_length()
Returns

The current length of the pipeline

Return type

int

get_results(lock=False, wait_time=15, silent=False, page_index=0, renderer=None, **kwargs)

Retrieve status, results from the kb cloud for the current pipeline

Parameters
  • To retrieve. The default is the last type that was run. (results) –

  • lock (bool , False) – This will lock the process and continuously ping the KB cloud

  • The status of the pipeline process. (for) –

  • wait_time (int , 30) – The time to wait between individual status checks

  • silent (bool , False) – This will silence updates to every 4th update check

  • page_index (int , 0) – The page desired if the result is multi-paged (1-based)

Returns

This is the result of the last executed pipeline step. stats (dictionary): A dictionary containing the execution summary, features and other summary statistics

Return type

results (DataFrame or model result)

grid_search(grid_params, run_parallel=True, lock=True, silent=True)

Grid search is a parameter optimization method that exhaustively searches over a gridded parameter space. Grid search returns will return the score each parameter combination for f1, precision and sensitivity, so that you can choose the best performing combination to build a knowledge pack with.

Parameters
  • grid_params (dict) – Grid search parameters

  • run_parallel (bool , True) – Run grid search in parallel in KB cloud

  • lock (bool , False) – Ping for results every 30 seconds until the process finishes

grid_params is a nested Python dictionary object.

grid_params = {“Name Of Function”:{“Name of Parameter”:[ A, B, C]}}

Where A, B and C are the parameters to search over. Additionally, for each step, you may want to search over more than one of a function’s configurable parameters. To do this, add another element to the function’s dictionary.

grid_params = {“Name Of Function”:{“Name of Parameter 1”:[ A, B, C], “Name of Parameter 2”:[ D, E]}}

This will tell grid search to search over six different parameter spaces.

You can also specify more than one step to search over in grid params. This is done by adding another element to the function level of the grid_params dictionary.

grid_params = {“Name Of Function”:{“Name of Parameter 1”:[ A, B, C], “Name of Parameter 2”:[ D, E]}, “Name of Function 2”:{“Name of Parameter”:[1, 2, 3, 4, 5, 6]}}

Example

grid_params = {‘Windowing’:{“window_size”: [100,200],’delta’:[100]}, ‘selector_set’: {“Recursive Feature Elimination”:{‘number_of_features’:[10, 20]}}, ‘Hierarchical Clustering with Neuron Optimization’: {‘number_of_neurons’:[10,20]}}

results, stats = client.pipeline.grid_search(grid_params)

iterate_columns(fg, n_columns=None, columns=None)

Builds Multiple Feature generators by iterating over the input columns

Parameters
  • fg (dict) – Single input feature generator following the standard format

  • n_columns (int) – The number of columns to return as a combination

  • columns (list) – If None, uses the columns in the input feature generator; otherwise, uses this list provided

Example
>>> fg =  {'name':'Downsample with Min Max Scaling', 'params':{"columns": ['gyrZ','gyrX'] , "new_length": 15}}
>>> fg_new = client.pipeline.iterate_columns(fg, n_columns=1)
>>> print(fg_new)

Output: [{‘name’: ‘Downsample with Min Max Scaling’, ‘params’: {‘columns’: [‘gyrX’], ‘new_length’: 15}}, {‘name’: ‘Downsample with Min Max Scaling’, ‘params’: {‘columns’: [‘gyrX’], ‘new_length’: 15}}]

list_knowledgepacks()

Lists all of the projects on kb cloud associated with current pipeline

Returns

projects on kb cloud

Return type

DataFrame

rehydrate(model=None, replace=True, kp_summary=False, uuid=None)

Replace the executing cell with pipeline code for either a model or pipeline

Parameters
  • model (model , knowledgepack , None) – Pass in a model to build a pipeline from that

  • replace (boolean , True) – Replace the executing cell with pipeline code

rehydrate_knowledgepack(model=None, uuid=None, replace=True)

Replace the executing cell with pipeline code for a knowledge pack

Parameters

model (model , knowledgepack , None) – Pass in a model to build a pipeline from that

rehydrate_pipeline(model=None, uuid=None, replace=True)

Replace the executing cell with pipeline code for the current pipeline or pipeline that generated the model

Parameters

model (model , knowledgepack , None) – Pass in a model to build a pipeline from that

reset(delete_cache=False)

Reset the current pipeline steps

Parameters

delete_cache (bool , False) – Delete the cache from KB cloud

set_classifier(name, params={})

Classification method for the TVO step to use.

Parameters
  • name (str) – Name of the classification method

  • params (dict , optional) – Parameters of the classification method

set_columns(data_columns=None, group_columns=None, label_column=None)

Sets the columns for group_columns, data_columns and the label column to be used in the pipeline. This will automatically handle label column, ignore columns, group columns and passthrough columns for the majority of pipelines. For pipelines that need individually specified column attributes, set them in the step command.

Parameters
  • data_columns (None , list) – List of sensor data streams to use

  • group_columns (None , list) – List of columns to use when applying aggregate functions and defining unique subsets on which to operate

  • label_column (None , str) – The column name containing the ground truth label

set_input_capture(names, group_columns=None)

Use a data file that has been uploaded as your data source.

Parameters

name (str , list) – Single capture or list of captures file names to use in MPLABML cloud

set_input_data(name, data_columns=None, group_columns=None, label_column=None)

Use a data file that was uploaded as your data source

Parameters
  • name (str , list) – The name of the data file or list of datafiles in MPLABML cloud

  • data_columns (list , required) – Array of data streams to use in model building

  • group_columns (list , required) – The List of columns to use when applying aggregate functions and defining unique subsets on which to operate

  • label_column (str , required) – The column with the true classification

set_input_features(name, feature_columns=None, group_columns=None, label_column=None)

Use a feature file as input to the pipeline (features were already generated in this case)

Parameters
  • name (str , list) – The name of the data file or list of feature files in MPLABML cloud

  • feature_columns (list , required) – The array of data streams to use in model building

  • group_columns (list , required) – The list of columns to use when applying aggregate functions and defining unique subsets to operate on

  • label_column (str , required) – The column with the true classification

set_input_query(name, use_session_preprocessor=True)

Set the input data to be a stored query.

Parameters
  • name (str) – The name of the saved query

  • use_session_preprocessor (bool) – Use the autosegmentation algorithms for this pipeline (if there is one). When this is true, the segmentation algorithm does not show up in the pipeline but is included in the firmware generation.

set_knowledgepack_platform(*args, **kwargs)

Backwards compatible call to set_device_configuration

set_training_algorithm(name, params={})

Training algorithm for the TVO step to use.

Parameters
  • name (str) – Name of the training algorithm

  • params (dict , optional) – Parameters of the training algorithm

set_tvo(params={})

Description of the train, validate optimize step, which consists of a training algorithm, validation method and classifier

Parameters

params (dict , optional) – Parameters of the TVO step

Example

>>> client.pipeline.set_validation_method('Stratified K-Fold Cross-Validation', params={'number_of_folds':3})
>>> client.pipeline.set_classifier('PME', params={"classification_mode":'RBF','distance_mode':'L1'})
>>> client.pipeline.set_training_algorithm('Hierarchical Clustering with Neuron Optimization', params = {'number_of_neurons':10})
>>> client.pipeline.set_tvo({'label_column':'Label', 'ignore_columns': ['Subject', 'Rep']})
set_validation_method(name, params={})

Set the validation method for the tvo step.

Parameters
  • name (str) – Name of the validation method to use

  • params (dict , optional) – Parameters for the validation method

stop_pipeline()

Kills a pipeline that is running on KB cloud

submit(lock=False)

Submit a pipeline asynchronously to MPLABML Cloud for execution

to_list()

Converts the current pipeline into its list form

Returns

Return type

List

visualize_features(feature_vector, label_column=None)

Makes a plot of feature vectors by class to aid in understanding your model

Parameters

feature_vector (DataFrame) – DataFrame containing feature vectors and label column

visualize_neuron_array(model, feature_vector, featureX=None, featureY=None, neuron_alpha=0.2)

Makes a plot of feature vectors by class to aid in understanding your model

Parameters
  • model (model/knowledgepack) – The model or knowledgepack to use for plotting the neurons

  • feature_vector (DataFrame) – DataFrame containing feature vectors and label column

  • featureX (str) – The name of the feature for the x axis

  • featureY (str) – The name of the feature for the y axis