Configuring a pipeline
After you have created a pipeline, you can start with the configuration of the pipeline.
A new pipeline version is created automatically for every saved change.
Refer to Managing pipeline versions.
This chapter details the following topics:
Prerequisites
To execute the following actions, you have to be assigned to the Admin role.
The Pipeline App has the status STOPPED unless otherwise stated.
Changing the basic information
You can change the Pipeline App Name and/or Pipeline Description in the Basic Information pane of App Settings.
Proceed as follows
Enter a new name in the Pipeline App Name field.
Enter a new description in the Pipeline Description field.
Click Save.
→ The name and/or description of the Pipeline App has been changed.
→ A new pipeline version has been automatically created. Refer to Managing pipeline versions.
Configuring processes and instances
The table in the Processes and Instances pane lists the individual instances together with their CPU, Memory Usage, and Disk Usage. If an instance is currently not running, the corresponding columns display the icon.
The table displays also the Global Processing Timeout in minutes, as well as the Multithreaded Processing status, along with the number of threads.
You can scale your Pipeline App by creating instances if it requires more memory or computation power. You can either increase the amount of instances that should run or order more memory per instance.
Increasing the instances and memory usage affects your account. Check the service plans and our pricing model for detailed information. The pricing can be found in the marketplace where you booked your service plan.
You can also configure the global processing timeout value and choose between single threaded or multithreaded processing.
Prerequisites
For updating the number of instances, memory usage and disk limit (points 2, 3 and 4 below), the Pipeline App must have the status STARTED.
Proceed as follows
Click the Settings icon in the Processes and Instances pane to scale the Pipeline App and to configure Advanced Settings.
→ The Scale Pipeline App dialog is displayed.In the Instances field, enter a number of instances you want to add.
In the Memory Usage drop-down list, select a value to adjust the memory usage.
In the Disk Limit drop-down list, select a value to adjust the disk limit.
Click Advanced Settings to access the following additional configuration options:
In the Default processing step timeout (in minutes) field, enter a value in minutes, which will be applied to the processing steps if no other timeout is defined.
The default value is one minute. The minimum value is also one minute, whereas the maximum value is 24 hours, defined in minutes.In the Threading configuration area, choose between Single threaded or Multithreaded (default) processing.
The default number of threads is five, which equals five input files.Although these advanced configuration settings are allowed, it is not recommended to change them, as this could lead to unforeseen behavior and issues.
Click the Apply Changes button to save your changes.
→ The Pipeline App has been scaled.
→ The Advanced configuration settings for processing timeout and threading are applied.
→ A new pipeline version has been automatically created. Refer to Managing pipeline versions.
Adding environment variables
Environment variables behave like standard system variables. Using environment variables, you can set parameters in the User-Provided Environment Variables pane to have them available in your application code.
If you enter a valid environment variable, it is saved automatically to the backend.
Proceed as follows
Click the Add Environment Variable button in the User-Provided Environment Variables pane.
→ A new line is added.Enter a Name for the environment variable.
Enter a Value for the environment variable.
→ The environment variable has been set.Click the Save button.
→ A new pipeline version has been automatically created. Refer to Managing pipeline versions.
Example
In the following image, you see that the environment variable APP_ENVIRONMENT=DEV has been set. The SFDE_PIPELINE environment variable is set by default and cannot be changed.
Depending on the programming language you use, this environment variable can be used inside the application.
In a Java application, you can print the variable as follows:
System.out.println(System.getenv(
"APP_ENVIRONMENT"
));
// -> DEV
In a python script, you can print the variable as follows:
print(os.environ[
'APP_ENVIRONMENT'
])
// -> DEV
Configuring the pipeline steps
Each pipeline consists of three steps:
Input trigger
The input trigger step is the first step of every pipeline. It can only exist once per pipeline. The defined query filters the incoming data for processing.Parser
The parser is used to transform the input data. A default parser is available that automatically detects the file type of the input data and transforms the input data accordingly. Multiple parser steps can be added.Output
The output step is the last step of every pipeline. It can only exist once per pipeline.
Each pipeline can be expanded by the following steps:
Parser
See above.Custom
In a custom step, you can upload your own code written in Python or Java to interact with and transform the input data.
In the Examples chapter, you can find information on how to configure the custom step, refer to Pipelines: Configuring the custom step.The custom processing logic and its Intellectual Property (IP) belong to the project owner.
The project owner is liable for damages which are caused by the custom processing logic.
We advise you to perform security and validity scans (especially when using open source code).
We also advise you to verify open source license compliance of your custom processing logic.Device Position
In a device position step, the GPS coordinates of a device are extracted and stored in its digital twin representation.
Proceed as follows
Open the Configuration tab and configure or update the relevant step.
When ready, click the Save button.
→ The configuration is saved.
→ A pipeline version is automatically created and can be tracked in the Change History tab. Refer to Managing pipeline versions.
Configuring the input trigger
Proceed as follows
In the Configuration tab, select the Input Trigger item in the sequence.
In the Collection field, enter the collection of your input data if you have more than one.
Set a trigger for the pipeline:
Activate the Match all inserted files radio button to process all files uploaded to the database.
Activate the Custom Filter radio button to trigger the pipeline only if the input data matches specific requirements.
Insert a query into the code box.
The syntax is based on the MongoDB Query Language.
Click the Save button if you want to stop or go on with the next step.
Processing of duplicates
If you do not want to process duplicates, enter the following query:
{ "tags": { "$not": { "$elemMatch": { "$eq": "duplicate" } } } , "metaData.yourKey": "yourMatchText"}
If you want to process duplicates, enter the following query:
{ "metaData.yourKey": "yourMatchText"}
Configuring the parser
Proceed as follows
Select the Parser item in the sequence.
Optionally, click the Edit icon to change the parser's name.
In the Parser Type drop-down list, select a parser type, e.g. ZIP file, JSON, Vector DBC, etc.
Some parser types are described in detail in the following.
For optimal performance, we recommend a data size of less than 500 MB.
→ The output is a JSON file created from the content of the input data.
ASAM MDF parser step
This parser is used for the Measurement Data Format and is best suited for a timeseries output collection.
Proceed as follows
In the Parser Type drop-down list, select the ASAM MDF parser type.
Add the environment setting MDF_IGNORE_UNKOWN_BLOCKS=1 to ignore unknown blocks inside the MDF file while parsing. With this setting, the decoder only displays a warning instead of stopping the parsing process,
in case of special mdf blocks.Select a timeseries collection which should be configured accordingly:
Time field must be the same as the time attribute of the MDF
Meta field should be defined and will be set to inputDataId
If a timeseries collection is selected the parser step output will be flattened and looks similar to the following:
[{
<TimeField>:
'2020-01-05T10:00:01.000Z'
,
'dwordCounter'
: 10,
'PWMFiltered'
: 99,
'Triangle'
: 37,
'inputDataId'
:
'5e1729cb84874f0015dd5014'
,
'fileName'
:
'Vector_MinimumFile.MF4'
,
'file'
:
'generic'
,
'Timeclass'
:
'LocalPCreferencetime'
,
'Startdistance'
: 0,
'processedAt'
:
'2020-01-09T13:25:31.554Z'
,
'Starttime'
:
'2011-08-24T13:53:19Z'
,
'DataGroupBlock'
: 1,
'receivedAt'
:
'2020-01-09T13:25:31.483Z'
,
'Startangle'
: 0,
'Name'
:
'MinimumMDF4.1filederivedfromCANapefilebyremovingsomeblocks'
,
<MetaField>:
'5e1729cb84874f0015dd5014'
},
{
<TimeField>:
'2020-01-05T10:00:02.000Z'
,
'dwordCounter'
: 124637,
'PWMFiltered'
: 110,
'Triangle'
: 47,
'inputDataId'
:
'5e1729cb84874f0015dd5014'
,
'fileName'
:
'Vector_MinimumFile.MF4'
,
'file'
:
'generic'
,
'Timeclass'
:
'LocalPCreferencetime'
,
'Startdistance'
: 0,
'processedAt'
:
'2020-01-09T13:25:31.554Z'
,
'Starttime'
:
'2011-08-24T13:53:19Z'
,
'DataGroupBlock'
: 1,
'receivedAt'
:
'2020-01-09T13:25:31.483Z'
,
'Startangle'
: 0,
'Name'
:
'MinimumMDF4.1filederivedfromCANapefilebyremovingsomeblocks'
,
<MetaField>:
'5e1729cb84874f0015dd5014'
}]
Vector DBC parser step
To parse input data of the type DBC, the CAN configuration must be customized with a regular expression. In addition, metadata can be matched with expressions and labeled accordingly. There is also a built-in text editor to test these expressions against the input data.
Proceed as follows
In the Parser Type drop-down list, select the Vector DBC parser type.
Optionally, click the Edit icon to change the parser's name.
In the attachment box, select or drop a ZIP file containing the input data of the type DBC.
In the Vector DBC Parser Settings pane, enter a Regular Expression.
Define the DeltaTime Group, CAN ID Group, and CAN Payload Group in the corresponding drop-down lists.
Activate the CAN v2.0 checkbox to enable the Message ID to be 29 bits long.
CAN v2.0 is activated by default. If you deactivate the checkbox, CAN v1.0 is used with a length of 11 bits.
Activate the HEX Encoded checkbox to use HEX values.
HEX Encoded is activated by default. If you deactivate the checkbox, integer values are used.
Optionally, in the Metadata Fields pane, click the Add Metadata button to add a Metadata trace selection.
→ A new line is added.You can add a maximum of 10 metadata parameters.
In the CAN Trace Evaluation pane, click the Test Regex button to test your regular expressions and the metadata rules with an example CAN trace.
FIBEX parser step
To parse input data of the type Fibex, the configuration must be customized with a regular expression. In addition, metadata can be matched with expressions and labeled accordingly. There is also a built-in text editor to test these expressions against the input data.
Proceed as follows
In the Parser Type drop-down list, select the Fibex parser type.
Optionally, click the Edit icon to change the parser's name.
In the attachment box, select or drop a ZIP file containing the input data of the type Fibex(XML).
In the FIBEX Parser Settings pane, enter a Regular Expression.
Define the DeltaTime Group, PDU ID, and PDU ID Group in the corresponding drop-down lists.
Activate the HEX Encoded checkbox to use HEX values.
HEX Encoded is activated by default. If you deactivate the checkbox, integer values are used.
Optionally, in the Metadata Fields pane, click the Add Metadata button to add a Metadata trace selection.
→ A new line is added.You can add a maximum of 10 metadata parameters.
In the Fibex Trace Evaluation pane, click the Test Regex button to test your regular expressions and the metadata rules with an example FIBEX trace.
iTraMS parser step
Proceed as follows
In the Parser Type drop-down list, select the iTraMS parser type.
Optionally, click the Edit icon to change the parser's name.
In the attachment box, select or drop a ZIP file containing the iTraMS decoder specification.
Optionally, test the iTrams decoder. Therefore, select or drop a bin file in the attachment box of the Try Out section. If the file was the decoded, the processed data is shown.
Configuring the output
Proceed as follows
Select the Output item in the sequence.
In the Output Collection drop-down list, select a collection to which the results should be stored.
To create a new collection, refer to Creating a new collection.
In case of an existing iTraMS Parser, there is an iTraMS Time Bucketing section. Optionally, select a time bucket size or turn it off completely.
The output data may not exceed 16 MB per document. Otherwise the document cannot be saved to the database.
Creating a new collection
There are two subtypes of collections:
Standard
For regular use casesTime Series
To store sequences of measurementsWe recommend to use documents with a flat data structure. If a parser step is used in the pipeline, the flat data structure will be applied. The flat data structure is used to prevent the generation of the sub entries "metadata" and "payload", as these are currently generated by default. That means, that these two items are missing in the Output JSON.
Proceed as follows
Click the New Collection button.
→ The Create New Collection dialog is displayed.In the Technical Name field, enter a technical name for the collection.
In the Label field, enter a label that will be displayed.
In the SubType drop-down list, select the subtype of the new collection.
If you selected Time Series, proceed as follows:
In the Time Field field, enter the field that contains the date in the time series document.
The time field must be on root level.
In the Meta Field field, enter a field name that contains metadata, e.g. deviceId.
Only top-level fields are allowed as metadata.
In the Expire after days field, define the number of days after which the documents will be deleted.
In the Granularity drop-down list, define the unit in which the time between individual sensor measurements passes.
Click the Create button.
→ The new collection is created.
→ A new query template for this collection is created, refer to Default query template for time series collections.
Processing large files from raw data storage
We recommend to process large files (100 MB and above) by using streaming via a custom step. Depending on the program logic, this may be required to prevent memory congestion.
Therefore you need to create a custom step that can handle two types of input files. First you have to change the configuration in the manifest to get payload references for large files (>10 KB), small files are directly embedded in the injected data. Then configure the corresponding custom step, for more information on how to configure such a step refer to the code examples and the corresponding README files in the Pipelines: Configuring the custom step.