2.3.10 Pipelines
A pipeline is a container for a series of data processing steps. The pipeline object allows you to get an existing pipeline or create a new one with a given name. With this object, you can set input data sources, add transforms, feature generators, feature selectors, feature transforms and classifiers. An example pipeline is shown below.
Examples:
client.pipeline = 'my_pipeline'
client.pipeline.set_input_query('ExampleQuery')
client.pipeline.add_transform('Windowing')
# Feature Generation
client.pipeline.add_feature_generator(
[
{'subtype_call':'Time', 'params':{'sample_rate':100}},
{'subtype_call':'Rate of Change'},
{'subtype_call':'Statistical'},
{'subtype_call':'Energy'},
{'subtype_call':'Amplitude', 'params':{'smoothing_factor':9}}
],
function_defaults={'columns':sensor_columns},
)
# Scale to 8 bit representation for classification
client.pipeline.add_transform('Min Max Scale')
# Perform Feature Selection to remove highly correlated features, features
with high variance and finally recursive feature eliminate.
# (Note: Recursive feature elimination can be very slow for large data sets and large number of parameters,
# it is recommended to use other feature selection algorithms to first reduce the number of features)
client.pipeline.add_feature_selector(
[
{"name": "Correlation Threshold","params":{'threshold':0.85}},
{"name": "Variance Threshold",
"params":{
'threshold':0.05
}
},
{"name":"Recursive Feature Elimination",
"params":{
"method":"Log R",
"number_of_features":20
}
},
],
params = {"number_of_features":20}
)
client.pipeline.set_validation_method('Stratified K-Fold Cross-Validation',
params={'number_of_folds':3})
client.pipeline.set_classifier('PME',
params={'classification_mode':'RBF',
'distance_mode':'L1'
})
client.pipeline.set_training_algorithm('Hierarchical Clustering with Neuron Optimization',
params = {'number_of_neurons':10})
client.pipeline.set_tvo({'validation_seed':0})
results, stats = client.pipeline.execute()
The final step is where the pipeline is sent to the Cloud Engine for execution. When the job is completed, the results are returned to you as a model object if classification has occurred or a DataFrame if you are at an intermediary step.
client.pipeline.execute()
- class mplabml.pipeline. Pipeline(kb, name=None)
Base class of a pipeline object
- add_augmentation(augmentation, params={})
Add augmentation set to the pipeline.
- Parameters
augmentation (List) – List of dictionaries containing time series augmentations
params (dict , {}) – Parameters of the feature selector set.
Examples
>>> client.pipeline.add_augmentation([{"name": "Add Noise", "params": {"scale": [0.05, 0.1, 0.15 ], # , 0.1, 0.15 >>> "target_labels": ['idle','walking'], >>> "target_sensors": ['AccelerometerX', 'GyroscopeX'] >>> }} >>> ])
- add_feature_generator(feature_generators, params={}, function_defaults={}, return_generator_set=False)
Add a feature generator set to the pipeline.
- Parameters
feature_generators (list) – List of feature generators. As names or dictionaries.
params (dict , {}}) – Parameters to apply to the feature generator set
function_defaults (dict , {}}) – Parameters to apply to all individual feature generators
Examples
>>> # Add a single feature generator >>> client.pipeline.add_feature_generator([{'name': 'Downsample', 'params': {'new_length': 5}}, {'name': 'Mean'}], function_defaults = {'columns': sensor_columns})
>>> # Call multiple functions by name when they use the same same function defaults >>> client.pipeline.add_feature_generator(['Mean', 'Standard Deviation', 'Skewness', 'Kurtosis', '25th Percentile', '75th Percentile', '100th Percentile', 'Zero Crossing Rate'], function_defaults = {'columns': sensor_columns})
>>> # Call multiple functions using function defaults >>> client.pipeline.add_feature_generator([{'name': 'Downsample', 'params': {'new_length': 5}}, {'name': 'Mean'}], function_defaults = {'columns': sensor_columns})
>>> # Call multiple functions by subtype which use different parameters; note all subtypes will take the same inputs >>> client.pipeline.add_feature_generator([{'subtype_call': 'Area', 'params': {'sample_rate': 100, 'smoothing_factor': 9}}, {'subtype_call': 'Time', 'params': {'sample_rate': 100}}, {'subtype_call': 'Rate of Change'}, {'subtype_call': 'Statistical'}, {'subtype_call': 'Energy'}, {'subtype_call': 'Amplitude', 'params': {'smoothing_factor': 9}}, {'subtype_call': 'Physical', 'params': {'sample_rate': 100}} ], function_defaults={'columns': sensor_columns}, )
>>> # Mix subtype and specify additional feature generators >>> client.pipeline.add_feature_generator([{'subtype_call': 'Statistical'}, {'name': 'Downsample', 'params': {'new_length': 5}}, ], function_defaults={'columns': sensor_columns}, )
>>> # Call the same feature generators multiple times with different parameters >>> client.pipeline.add_feature_generator([{'name': 'Downsample', 'params': {'new_length': 5, 'columns': sensor_columns[0]}}, {'name': 'Downsample', 'params': {'new_length': 12}}, ], function_defaults={'columns': sensor_columns}, )
- add_feature_selector(feature_selectors, params={})
Add a feature selection set to the pipeline.
- Parameters
feature_selectors (List) – List of dictionaries containing feature selectors
params (dict , {}) – Parameters of the feature selector set
Examples
>>> client.pipeline.add_feature_selector([{"name":"Recursive Feature Elimination", "params":{"method":"Log R"}}], >>> params = {"number_of_features":20})
- add_linear_step(func)
Add a step to the pipeline. Automatically tie the previous step and current step.
- Parameters
func (function) – A mplabml function method call
- add_segmenter(name, params={})
Add a Segmenter to the pipeline.
- Parameters
name (str) – Name of the segmenter method to add
params (dict , optional) – Dictionary containing the parameters of the transform
- add_transform(name, params={}, overwrite=True)
Add a Transform to the pipeline.
- Parameters
name (str) – Name of the transform method to add
params (dict , optional) – Dictionary containing the parameters of the transform
overwrite (boolean) – When adding multiple instances of the same transform, set overwrite to False for the additional steps and the first instance will not be overwritten. (default is True)
- auto(auto_params, run_parallel=True, lock=True, silent=True, renderer=None, version=2)
Execute automated pipeline asynchronously
The automated pipeline finds optimal parameters with a genetic algorithm. The genetic algorithm starts with a user-defined randomized population (pipelines) and generates models from them and tests them, keeps a subset of high-performing combinations, then recombines those survivors and repeats the process over again. The offspring of good parameter combinations are usually also good and sometimes are significantly better than their parents. As the algorithm iterates each successive generation, it often finds a near-optimal model without trying as many configurations. The algorithm terminates when the desired number of iterations are complete.
The created pipelines are run in parallel in MPLABML servers and results are ranked by the fitness score, which takes into account the model’s F1 score, precision, sensitivity and other metrics. The automation options and definition of the performance metrics are given below.
- Parameters
- params (dict) – Parameters of the genetic algorithm to optimize the pipelines. Definition and options of the parameters are given below. - search_steps, (list, [‘selectorset’, ‘tvo’]): it is used to define which libraries in the pipelines will be optimized.
population_size, (int, 10): Initial number of randomly created pipelines
iterations, (int, 1): Repetition number of optimization process
mutation_rate, (float, 0.1): Random changes from the previous population
recreation_rate, (float, 0.1): Rate of randomly created pipelines for next generation
survivor_rate, (float, 0.5): Ratio of the population that will be transferred to next generation
number_of_models_to_return, (int, 5): Number of pipelines that will return to user
allow_unknown, (bool, False): Allows creating unknown prediction results for the vectors. A vector is classified as an unknown if it cannot be recognized by any neurons.
fitness (dict): Fitness parameters are combination of statistical and cost variables. It is used to evaluate the performance of the models found by the algorithm. The user will define the coefficient scores for the parameters to define the priority of the parameters in the fitness score. Definition and options of the parameters are given below.
- statistical variables:
accuracy: The degree of correctness of all vectors
f1_score: Measures of the test’s accuracy
precision: Proportion of positive identifications that is actually correct
sensitivity: Measures of the proportion of actual positives that are correctly identified
specificity: Measures of the proportion of actual negatives that are correctly identified
positive_predictive_rate: Ratio of true positive is the event that the test makes a positive prediction
- cost variables:
neurons: Number of neurons that used in the model
features: Number of features that used in the model
lock (bool , False) – Ping for results every 30 seconds until the process finishes
silent (bool , True) – Silence status updates
Example
activity_data = client.datasets.load_activity_raw() client.upload_dataframe('activity_data.csv', activity_data, force=True) client.pipeline.reset() client.pipeline.set_input_data('activity_data.csv', data_columns = ['accelx', 'accely', 'accelz', 'gyrox', 'gyroy', 'gyroz'], group_columns = ['Subject','Class', 'Rep'], label_column = 'Class') client.pipeline.add_transform("Windowing", params={"window_size": 100, "delta": 100 }) results, summary = client.pipeline.auto({ 'params':{"search_steps": ['selectorset', 'tvo'], "population_size": 10, "iterations": 1, "mutation_rate": 0.1, "recreation_rate": 0.1, "survivor_rate": 0.5, "number_of_models_to_return": 5, "run_parallel": True, "allow_unknown": False, "fitness": {'accuracy': 1.0, 'f1_score': 0.0, 'features': 0.3, 'precision': 0.0, 'sensitivity': 0.7, 'specificity': 0.0, 'positive_predictive_rate': 0.0}, }}) results.summarize()
- autosegment_search(params, run_parallel=True, lock=True, silent=True)
Execute auto segment search pipeline asynchronously.
- Parameters
params (dict) – Automation parameters for segment search
run_parallel (bool , True) – Run in parallel in KB cloud
lock (bool , False) – Ping for results every 30 seconds until the process finishes
silent (bool , True) – Silence status updates
- clear_cache()
Deletes the cache on KB cloud for this pipeline.
- data(pipeline_step, page_index=0)
Retrieves results from a specific pipeline step in the pipeline from stored values in kbcloud after execution is performed.
- Parameters
pipeline_step (int) – Pipeline step to retrieve results from
page_index (int) – Index of data to get
- Returns
A ModelResultSet if the selected pipeline step is TVO step; otherwise, the output of the pipeline step is returned as a DataFrame.
- delete_sandbox()
Clears the local pipeline steps, and deletes the sandbox from the KB cloud.
- describe(show_params=True, show_set_params=False)
Prints out a description of the pipeline steps and parameters
- Parameters
show_params (bool , True) – Include the parameters in the pipeline description
- execute(wait_time=15, silent=True, describe=True, **kwargs)
Execute pipeline asynchronously and check for results
- Parameters
wait_time (int , 10) – Time to wait in between status checks
silent (bool , True) – Silence status updates
- features_to_tensor(feature_vectors, validate=0.1, test=0.1, label=None, feature_columns=None, class_map=None, dtype=<class 'numpy.int32'>, int8_input_type=True, shape=None, verbose=True)
Converts a MPLABML feature vector DataFrame into a train, test, validate set of Numpy arrays to use in training a tensorflow model
- Parameters
feature_vectors (DataFrame) – DataFrame of feature vectors returned from MPLABML Pipeline
validate (float , optional) – Percentage of data to use in validation set. Defaults to 0.1.
test (float , optional) – Percentage of data to use in test set. Defaults to 0.1.
label (str , optional) – The label column for the feature vectors. Defaults to None.
feature_columns (list , optional) – The columns of the feature vector DataFrame containing features. Defaults to None.
class_map (dict , optional) – A class map describing the mapping of labels to int values. Defaults to None.
dtype (_type_ , optional) – The dtype of the tensorflow object to create. Defaults to np.int32.
int8_input_type (bool , optional) – Scales the MPLABML Features to int8 from uint8. Defaults to True.
shape (tuple , optional) – Reshape the DataFrame before loading it into the numpy array. Defaults to None.
- Returns
A tuple containing numpy arrays for traning a TensorFlow model along with the class map x_train, x_test, x_validate, y_train, y_test, y_validate, class_map
- get_automl_iteration_metrics()
Results of automl stats for each iteration
- Returns
AutoML results stats
- Return type
DataFrame
- get_function_type(name)
- Returns
The type of a function
- Return type
str
- get_knowledgepack(uuid)
Retrieve knowledgepack by uuid from the server
- Parameters
uuid (str) – Unique identifier for knowledgepack
- Returns
A knowledgepack object
- Return type
TYPE
- get_pipeline_length()
- Returns
The current length of the pipeline
- Return type
int
- get_results(lock=False, wait_time=15, silent=False, page_index=0, renderer=None, **kwargs)
Retrieve status, results from the kb cloud for the current pipeline
- Parameters
To retrieve. The default is the last type that was run. (results) –
lock (bool , False) – This will lock the process and continuously ping the KB cloud
The status of the pipeline process. (for) –
wait_time (int , 30) – The time to wait between individual status checks
silent (bool , False) – This will silence updates to every 4th update check
page_index (int , 0) – The page desired if the result is multi-paged (1-based)
- Returns
This is the result of the last executed pipeline step. stats (dictionary): A dictionary containing the execution summary, features and other summary statistics
- Return type
results (DataFrame or model result)
- grid_search(grid_params, run_parallel=True, lock=True, silent=True)
Grid search is a parameter optimization method that exhaustively searches over a gridded parameter space. Grid search returns will return the score each parameter combination for f1, precision and sensitivity, so that you can choose the best performing combination to build a knowledge pack with.
- Parameters
grid_params (dict) – Grid search parameters
run_parallel (bool , True) – Run grid search in parallel in KB cloud
lock (bool , False) – Ping for results every 30 seconds until the process finishes
grid_params is a nested Python dictionary object.
grid_params = {“Name Of Function”:{“Name of Parameter”:[ A, B, C]}}
Where A, B and C are the parameters to search over. Additionally, for each step, you may want to search over more than one of a function’s configurable parameters. To do this, add another element to the function’s dictionary.
grid_params = {“Name Of Function”:{“Name of Parameter 1”:[ A, B, C], “Name of Parameter 2”:[ D, E]}}
This will tell grid search to search over six different parameter spaces.
You can also specify more than one step to search over in grid params. This is done by adding another element to the function level of the grid_params dictionary.
grid_params = {“Name Of Function”:{“Name of Parameter 1”:[ A, B, C], “Name of Parameter 2”:[ D, E]}, “Name of Function 2”:{“Name of Parameter”:[1, 2, 3, 4, 5, 6]}}
Example
grid_params = {‘Windowing’:{“window_size”: [100,200],’delta’:[100]}, ‘selector_set’: {“Recursive Feature Elimination”:{‘number_of_features’:[10, 20]}}, ‘Hierarchical Clustering with Neuron Optimization’: {‘number_of_neurons’:[10,20]}}
results, stats = client.pipeline.grid_search(grid_params)
- iterate_columns(fg, n_columns=None, columns=None)
Builds Multiple Feature generators by iterating over the input columns
- Parameters
fg (dict) – Single input feature generator following the standard format
n_columns (int) – The number of columns to return as a combination
columns (list) – If None, uses the columns in the input feature generator; otherwise, uses this list provided
- Example
>>> fg = {'name':'Downsample with Min Max Scaling', 'params':{"columns": ['gyrZ','gyrX'] , "new_length": 15}} >>> fg_new = client.pipeline.iterate_columns(fg, n_columns=1)
>>> print(fg_new)
Output: [{‘name’: ‘Downsample with Min Max Scaling’, ‘params’: {‘columns’: [‘gyrX’], ‘new_length’: 15}}, {‘name’: ‘Downsample with Min Max Scaling’, ‘params’: {‘columns’: [‘gyrX’], ‘new_length’: 15}}]
- list_knowledgepacks()
Lists all of the projects on kb cloud associated with current pipeline
- Returns
projects on kb cloud
- Return type
DataFrame
- rehydrate(model=None, replace=True, kp_summary=False, uuid=None)
Replace the executing cell with pipeline code for either a model or pipeline
- Parameters
model (model , knowledgepack , None) – Pass in a model to build a pipeline from that
replace (boolean , True) – Replace the executing cell with pipeline code
- rehydrate_knowledgepack(model=None, uuid=None, replace=True)
Replace the executing cell with pipeline code for a knowledge pack
- Parameters
model (model , knowledgepack , None) – Pass in a model to build a pipeline from that
- rehydrate_pipeline(model=None, uuid=None, replace=True)
Replace the executing cell with pipeline code for the current pipeline or pipeline that generated the model
- Parameters
model (model , knowledgepack , None) – Pass in a model to build a pipeline from that
- reset(delete_cache=False)
Reset the current pipeline steps
- Parameters
delete_cache (bool , False) – Delete the cache from KB cloud
- set_classifier(name, params={})
Classification method for the TVO step to use.
- Parameters
name (str) – Name of the classification method
params (dict , optional) – Parameters of the classification method
- set_columns(data_columns=None, group_columns=None, label_column=None)
Sets the columns for group_columns, data_columns and the label column to be used in the pipeline. This will automatically handle label column, ignore columns, group columns and passthrough columns for the majority of pipelines. For pipelines that need individually specified column attributes, set them in the step command.
- Parameters
data_columns (None , list) – List of sensor data streams to use
group_columns (None , list) – List of columns to use when applying aggregate functions and defining unique subsets on which to operate
label_column (None , str) – The column name containing the ground truth label
- set_input_capture(names, group_columns=None)
Use a data file that has been uploaded as your data source.
- Parameters
name (str , list) – Single capture or list of captures file names to use in MPLABML cloud
- set_input_data(name, data_columns=None, group_columns=None, label_column=None)
Use a data file that was uploaded as your data source
- Parameters
name (str , list) – The name of the data file or list of datafiles in MPLABML cloud
data_columns (list , required) – Array of data streams to use in model building
group_columns (list , required) – The List of columns to use when applying aggregate functions and defining unique subsets on which to operate
label_column (str , required) – The column with the true classification
- set_input_features(name, feature_columns=None, group_columns=None, label_column=None)
Use a feature file as input to the pipeline (features were already generated in this case)
- Parameters
name (str , list) – The name of the data file or list of feature files in MPLABML cloud
feature_columns (list , required) – The array of data streams to use in model building
group_columns (list , required) – The list of columns to use when applying aggregate functions and defining unique subsets to operate on
label_column (str , required) – The column with the true classification
- set_input_query(name, use_session_preprocessor=True)
Set the input data to be a stored query.
- Parameters
name (str) – The name of the saved query
use_session_preprocessor (bool) – Use the autosegmentation algorithms for this pipeline (if there is one). When this is true, the segmentation algorithm does not show up in the pipeline but is included in the firmware generation.
- set_knowledgepack_platform(*args, **kwargs)
Backwards compatible call to set_device_configuration
- set_training_algorithm(name, params={})
Training algorithm for the TVO step to use.
- Parameters
name (str) – Name of the training algorithm
params (dict , optional) – Parameters of the training algorithm
- set_tvo(params={})
Description of the train, validate optimize step, which consists of a training algorithm, validation method and classifier
- Parameters
params (dict , optional) – Parameters of the TVO step
Example
>>> client.pipeline.set_validation_method('Stratified K-Fold Cross-Validation', params={'number_of_folds':3}) >>> client.pipeline.set_classifier('PME', params={"classification_mode":'RBF','distance_mode':'L1'}) >>> client.pipeline.set_training_algorithm('Hierarchical Clustering with Neuron Optimization', params = {'number_of_neurons':10}) >>> client.pipeline.set_tvo({'label_column':'Label', 'ignore_columns': ['Subject', 'Rep']})
- set_validation_method(name, params={})
Set the validation method for the tvo step.
- Parameters
name (str) – Name of the validation method to use
params (dict , optional) – Parameters for the validation method
- stop_pipeline()
Kills a pipeline that is running on KB cloud
- submit(lock=False)
Submit a pipeline asynchronously to MPLABML Cloud for execution
- to_list()
Converts the current pipeline into its list form
- Returns
- Return type
List
- visualize_features(feature_vector, label_column=None)
Makes a plot of feature vectors by class to aid in understanding your model
- Parameters
feature_vector (DataFrame) – DataFrame containing feature vectors and label column
- visualize_neuron_array(model, feature_vector, featureX=None, featureY=None, neuron_alpha=0.2)
Makes a plot of feature vectors by class to aid in understanding your model
- Parameters
model (model/knowledgepack) – The model or knowledgepack to use for plotting the neurons
feature_vector (DataFrame) – DataFrame containing feature vectors and label column
featureX (str) – The name of the feature for the x axis
featureY (str) – The name of the feature for the y axis