A Talend "Connect By" Example

Out of the box, Talend's Data Integration components cover many scenarios. However, there are some where there simply isn't a component that will do the job. In situations like that you can search for bespoke components that people have built, or maybe even build one yourself. But the problem is that these components are not supported and will not be guaranteed to be upgraded when the version you are using is upgraded. A good way to get around this is to create a child job that provides the functionality you require. To all intents and purposes you can treat a Talend DI Job as a bespoke component. Data can be supplied easily enough to the Job and received back from the Job with little effort. There is a "behind the scenes" code overhead, but in most situations that is not going to be a problem.

In this tutorial we are looking at recreating some functionality that is very useful in the Oracle world. If you have a dataset with paren/child relationships (such as actual parent/child relationships or directory structures) in Oracle, you can use a function called "Connect By" to simply create a chain of records using a recursive self join. In Talend, recursion is not easy (unless using web services) and there are no components to accomplish this sort of task. BUT, this can be achieved using components that Talend supply and a bit of simple Java. This tutorial demonstrates how to achieve this functionality in a reusable Job

The tutorial will descibe two Jobs and a Java routine. The Jobs are the actual "ConnectBy" Job and a Job ("ConnectByExample") that uses it. The Java routine ("ConnectByJobData") is described first, below.....

 

The ConnectByJobData Routine

This is a simple Java class used to hold data. It is made up of 3 String variables and associated "getters" and "setters". The class is shown below......

package routines;

public class ConnectByJobData {
    
        private String record;
        private String id;
        private String parentId;
        
        public ConnectByJobData(String record, String id, String parentId){
            this.record = record;
            this.id = id;
            this.parentId = parentId;
        }

        public String getRecord() {
            return record;
        }

        public void setRecord(String record) {
            this.record = record;
        }

        public String getId() {
            return id;
        }

        public void setId(String id) {
            this.id = id;
        }
        
        public String getParentId() {
            return parentId;
        }

        public void setParentId(String parentId) {
            this.parentId = parentId;
        }
                
    }

This is used when the data is passed between the outer Job and the "ConnectBy" Job. An ArrayList is used to hold an instance of this class for every datarow.

 

The ConnectBy Job

Below we can see the layout of the "ConnectBy" Job. You will notice that there are some "deactivated" tLogRow components. It is good practice to put tLogRow components in Jobs to make them easier to debug. When they are deactivated it is like they do not exist. But when they are switched on they will not disrupt your input and outputs. However, if you were to add a tLogRow to debug something, you may find that you have to adjust the internals of tMap components or where you have referred to the "row" name.

This Job handles all of the "Connect By" functionality. 

The functionality of this Job can be controlled by using 3 Context variables. These can be seen below....

The "array" variable is used to receive an ArrayList of "ConnectByJobData" objects. This is a Java class that is used to hold the data. This was explained at the beginning.
The "record_seperator" variable (spot the spelling error frown) is used to supply a separator to be used in the chain of records that is built. So, in the situation where the parent/child records represent a folder structure, the resulting chain will be a folder path. So it might make sense to have "/" as the separator.
The "showIncrementalRecords" variable is a boolean used to tell the Job to either include or exclude incremental records. By that, I mean that if you have a folder structure that ultimately ends as "c/folder1/folder2/folder3", showing the incremental records would return the below recordset....

"c"
"c/folder1"

"c/folder1/folder2"
"c/folder1/folder2/folder3"


Not showing the incremental records would just return "c/folder1/folder2/folder3"
 

1) "Convert input array to a datarow" (tJavaFlex)

This component is used to take the data from the context.array variable and convert it into datarows. The configuration of this component can be seen below....

First we need to set the schema. To do this click on the "Edit schema" button (circled in red) and a popup window will appear. Create the schema that can be seen above by clicking the green plus symbol (circled in red) and filling in the column name and type.

The code that needs to be added to the tJavaFlex is shown above and can be copied from below.....

Start Code

// Use an Iterator to access the ArrayList
java.util.Iterator<ConnectByJobData> it = ((java.util.ArrayList<ConnectByJobData>)context.array).iterator();

while(it.hasNext()){

Main Code

// Get values from the ConnectByPriorJobData object from the ArrayList
ConnectByJobData data = it.next();
row1.record = data.getRecord();
row1.id = data.getId();
row1.parentId = data.getParentId();
End Code
// end of the component, outside/closing the loop
}

 

2) "In memory store of records" (tHashOutput)

This component is used to keep all of the datarows generated by the previous component "in-memory". So long as the schema for the previous component has been set, this component will have it automatically set. 

 

3+4) "Main Record input" and "Lookup Record input" (tHashInput)

These components are used to input the data stored "in-memory" by the "In memory store of records" component. These components are configured in the same way. This can be seen below.....

The key thing to be done with these components is to set the schema. To do this, click on the "Edit schema" button (circled in red) and a popup will appear. Click on the green plus symbol in the popup (circled in red) and recreate the schema that can be seen above. The next thing to do is to link these tHashInput components to the tHashOutput component that put the data in memory. To do this, select the correct component from the "Component list" drop down (circled in red).

 

5) "Link Child and Parent record" (tMap)

This component is used to link the parent and child record values to the child record id. The configuration of this component can be seen below....

 

6) "Store of Child and Parent linked records" (tHashOutput)

This component is used to keep all of the datarows generated by the previous component tMap "in-memory". So long as the schema for the previous component has been set, this component will have it automatically set. 

 

7) "Set global variable values to keep track of Loop" (tJava)

This component is used to enable the use of Java code to set global variables which are used to keep track of the following tLoop component. The code to set these is seen below...

globalMap.put("beforeRecordCount", 0);
globalMap.put("afterRecordCount", -1);

 

8) "tLoop_1" (tLoop)

This component is used to drive a looping mechanism which ultimately builds the parent/child values into "chains" of those values. The configuration of this can be seen below....

In order to configure this component we need to set the "Loop Type" (circled in red) to "While" and set the "Condition" (circled in red) to....

((Integer)globalMap.get("beforeRecordCount")).intValue()!=((Integer)globalMap.get("afterRecordCount")).intValue()

The other fields can be left as they are.

 

9) "Child/Parent linked records" (tHashInput)

This component is used to retrieve the records stored in-memory by the "Store of Child and Parent linked records" component. For every iteration of the loop, every record is retrieved and used to build strings of the record values "chained" together. This component is linked to the tLoop by an "iterate" link.

This component is configured by first creating the schema. It needs to be the same as the schema for the "Store of Child and Parent linked records" component. This can be created by clicking on the "Edit schema" button (circled in red) which will create a popup. Then click on the green plus symbol (circled in red) for each column that needs creating. Populate the "Column" and "Type" fields as shown above. You can also copy the schema from the "Store of Child and Parent linked records" component and paste it here. 

The columns that are required are below....

ColumnType
child_recordString
parent_recordString
child_idString

 

The last thing that needs to be done with this component is to set the component that this is linked to. In this case it is the "Store of Child and Parent linked records" component. Select that component from the "Component list" drop down (circled in red).

 

10) "tJavaFlex_3" (tJavaFlex)

This component is used to build the parent/child record "chains". The configuration of this component can be seen below...

First, the output schema needs to be sorted out. For this component we want to output two strings; record and child_id. To do this, click on the "Edit schema" button (circled in red) and click the green plus symbol (circled in red) for each column.

Once that has been done, we need add the "Start code", "Main code" and "End code". The code for this is explained in-line and is included below. Be aware that row names are referenced. These may be different in your version.

Start Code

// Create a "records" global variable to store incremental stages of the final record build up
java.util.HashMap<String, String> records = null;

if(globalMap.get("records")==null){
    
    records = new java.util.HashMap<String, String>();
    
}else{

    records = (java.util.HashMap<String, String>)globalMap.get("records");

}

// Put the current "records" size in a variable to keep track of the "records" hashmap growth
// When there are no more records, the loop will stop

globalMap.put("beforeRecordCount", records.size());

 

Main Code

// Receive the datarow values
String rowChildValue = row8.child_record;
String rowParentValue = row8.parent_record; 

// If the "rowParentValue" field is null, it means the record is "root" 
if(rowParentValue==null){
    records.put(rowChildValue, rowChildValue);

}else{ // All other records are children of a "root"

    // Look for the parent record. It might not exist yet depending on original order of records
    if(records.get(rowParentValue)!=null){
    
        String tempRecord = records.get(rowParentValue);
        
        // Add the new child record to the "chain"
        records.put(rowChildValue, tempRecord+context.record_seperator+rowChildValue);
    }
}

// Return current "chain" of records to the next component
row3.record = records.get(rowChildValue);
row3.child_id = row8.child_id;

End Code
// Update the "records" global variable and update the "afterRecordCount" globals variable
globalMap.put("records",records);     
globalMap.put("afterRecordCount", records.size());

 

11) "Built 'chain' records" (tHashOutput)

This component is used to keep all of the datarows generated by the previous component "in-memory". So long as the schema for the previous component has been set, this component will have it automatically set.

 

12) ""Chain" records" (tHashInput)

This component is used to retrieve all of the records placed in-memory by the "Build 'chain' records" component. This component needs to have the same schema as the "Build 'chain' records" component. This is done in the same way as has been shown previously. The "Component list" field must be set to "Build 'chain' records".

 

13) "Remove duplicates" (tAggregateRow)

This component is used to remove duplicate values that may have been created by the looping mechanism. This is done by simply grouping the datarows by both the columns. The configuration of this component can be seen below....

First we need to set up the schema that will be passed on. To do this, click on the "Edit schema" button (circled in red) and a popup will appear. On the popup, click on the double right pointing arrows (circled in red) to copy the input schema to the output. 

To set the "Group by" section, simply click twice on the green plus symbol (circled in red) and select "record" and "child_id".

 

14) "Sort 'chain' records in descending order" (tSortRow)

This component is used to sort the datarows into descending alphabetic order. The configuration of this component can be seen below....

 

The schema for this component is automatically configured by connecting it to the previous component. So, all we need to do is to set the column that we are using to order the data. In this case we are using the "record" column. To do this, click on the green plus symbol (circled in red) and then select all of the values shown above in red.

 

15) "Ordered records" (tHashOutput)

This component is used to keep all of the datarows generated by the previous component "in-memory". So long as the schema for the previous component has been set, this component will have it automatically set.

 

16) "Dummy" (tJava)

This component in itself doesn't atcually do anything. It is simply there as a point where a decision can be made. The decision is carried out by the "RunIf" links that come from it.
 
If(order:1)
This link is followed only if the Context variable "showIncrementalRecords" is false. The IF condition is shown below.....
!context.showIncrementalRecords

 

If(order:2)
This link is followed only if the Context variable "showIncrementalRecords" is true. The IF condition is shown below....

context.showIncrementalRecords
 

17+20) "Ordered Records" (tHashInput)

These components are used to retrieve all of the records placed in-memory by the "Ordered Records" tHashOutput component. These components are configured in the same way. They need to have the same schema as the "Ordered Records" tHashOutput component. This is done in the same way as has been shown previously. The "Component list" field must be set to the "Ordered Records" tHashOutput component.

 

18) "Remove incremental and null records" (tMap)

This component is used to remove incomplete chains of data that were used while building the hierarchical chains from root to leaf. It is also used to remove any null records that will also be created during the process. The tMap configuration can be seen below.....

By this time in the tutorial, I am assuming you will be able to configure this component from the picture and the details below. Please post any questions (if any) at the bottom.

tMap Var fields

ExpressionTypeVariable
Var.thisValue!=null ? Var.thisValue : null  StringlastValue
row11.record StringthisValue

The Var configuration is important here and some widely unknown functionality is being used. Talend tMap variables will hold the values of previous rows. This is enabled by the variables being processed from top to bottom and maintaining their values between rows. So in this example, the first time "lastValue" is set it will check the value of "thisValue" which has not been set yet. Therefore it will be null. Then "thisValue" will have its value set to the value of "row11.record". The next time the "lastValue" variable is set it will be set to the value held by "thisValue" which holds the last value of "row11.record". This is a really useful mechanism for comparing ordered rows of data.

The other important part of this tMap is the "out1" filter. This can be seen below....

Var.thisValue!=null&&(Var.lastValue==null || (Var.lastValue.indexOf(Var.thisValue+context.record_seperator)==-1))

 

This filter filters out any records which are a subset of another record. So given the following records.....

a/b/c/d/e/f
a/b/c/d
a/b/c/d/e
1/2/3/4/5
1/2/3
1/2/3/4

....... only the following records would be returned....

a/b/c/d/e/f
1/2/3/4/5

 

19+20) "Return result" (tBufferOutput)

These components are used to send the datarows generated by this Job back to the parent Job. These components are automatically configured to return the datarows using the schema of the previous component. All that needs to be done is for these components to be connected.

 

21) "Remove null records" (tMap)

This component is used to remove any null records that will be created during the process. The configuration of this component is exactly the same as the "Remove incremental and null records" component, but the filter on the output table only filters out null records. Copy the configuration of the "Remove incremental and null records" component and replace the filter to what can be seen below....

Var.thisValue!=null

 

 

The ConnectByExample Job

This Job is simply used to demonstrate the "ConnectBy" Job. In this Job we read from a flat file with hierarchical data and output the response to the System.out. The structure of this Job can be seen below....

 

1) "Read file" (tFileInputDelimited)

This component is used to read a flat file (included in this tutorial). It is a basic flat file with 3 columns. Those 3 columns need a schema configured for them. The configuration of this component can be seen below... 

To create the schema, click on the "Edit schema" button (circled in red) and a popup will appear. On the popup, click on the green plus symbol (circled in red) for each column. Populate the details as above.

The other thing to do is to point the component to the actual file. I have chosen to hardcode the path in the example above as it is just a basic job to demonstrate the functionality. You may wish to use a context variable for this. The file has been configured to use a semi colon as the separator. This happens to match the default setting of this component.

 

2) "Load data to ArrayList" (tJavaFlex)

This component is used to populate a global ArrayList variable with the data from the file. The schema will be set when it is connected. All you need to do is add the following code to the appropriate sections. Remember that the incoming row name may not be "row1" in your version.

Start Code

// start part of your Java code
java.util.ArrayList<ConnectByJobData> data = new java.util.ArrayList<ConnectByJobData>(); 

 

Main Code

//Add the column data to the object
    data.add(new ConnectByJobData(row1.value, row1.id, row1.parentId));

 

 
End Code
// Add the ArrayList to the globalMap HashMap
globalMap.put("data", data);

 
 

3) "Call ConnectBy Job" (tRunJob)

The tRunJob is mostly configured by simply dragging the "ConnectBy" Job on to the workspace. However, to pass the data to this Job to be processed we need to configure the context variables that are used. We also need to configure the schema of the component so that data can be returned. The configuration of this component can be seen below....

To configure the schema, click on the "Edit schema" button (circled in red) and a popup will appear. Click on the green plus symbol (circled in red) to recreate the schema of the "ConnectBy" Job's "Return Result" components.

This Job needs to receive values for 3 context variables; array, showIncrementalRecords and record_seperator.

The array context variable needs to be set the value of the globalMap variable "data". This is the ArrayList that is populated by the "Load data to ArrayList" component.
The showIncrementalRecords context variable is used to decide whether all incremental "chains" are returned or whether just the final root to leaf "chains" are returned.
The record_seperator context variable is used to supply the value that will be used to separate the individual values in a returned "chain".

 

4) "Print results" (tLogRow)

This component is used to print out the result to the System.out. This needs no configuration apart from connecting to the last component.

 

Running the Job

This tutorial is supplied with an example flat file to be used. The format and content of the flat file is shown below....

35;9;34
18;R;17
3;C;2
9;I;8
36;10;35
5;E;4
6;F;5
28;2;27
30;4;29
13;M;12
8;H;7
4;D;3
10;J;9
32;6;31
11;K;10
29;3;28
17;Q;16
14;N;13
7;G;6
15;O;14
16;P;15
24;X;23
33;7;32
22;V;21
20;T;19
21;U;20
1;A;
23;W;22
26;Z;25
2;B;1
25;Y;24
27;1;
12;L;11
31;5;30
34;8;33
19;S;18

 

The file format shown above is id; data value; parent_id

When this file is processed it will return one of two different results depending on the setting for the showIncrementalRecords context variables. If it is set to "true" then below will be the result...

A*B*C*D*E*F*G*H*I*J*K*L*M*N*O*P*Q*R*S*T*U*V*W*X*Y*Z|26
A*B*C*D*E*F*G*H*I*J*K*L*M*N*O*P*Q*R*S*T*U*V*W*X*Y|25
A*B*C*D*E*F*G*H*I*J*K*L*M*N*O*P*Q*R*S*T*U*V*W*X|24
A*B*C*D*E*F*G*H*I*J*K*L*M*N*O*P*Q*R*S*T*U*V*W|23
A*B*C*D*E*F*G*H*I*J*K*L*M*N*O*P*Q*R*S*T*U*V|22
A*B*C*D*E*F*G*H*I*J*K*L*M*N*O*P*Q*R*S*T*U|21
A*B*C*D*E*F*G*H*I*J*K*L*M*N*O*P*Q*R*S*T|20
A*B*C*D*E*F*G*H*I*J*K*L*M*N*O*P*Q*R*S|19
A*B*C*D*E*F*G*H*I*J*K*L*M*N*O*P*Q*R|18
A*B*C*D*E*F*G*H*I*J*K*L*M*N*O*P*Q|17
A*B*C*D*E*F*G*H*I*J*K*L*M*N*O*P|16
A*B*C*D*E*F*G*H*I*J*K*L*M*N*O|15
A*B*C*D*E*F*G*H*I*J*K*L*M*N|14
A*B*C*D*E*F*G*H*I*J*K*L*M|13
A*B*C*D*E*F*G*H*I*J*K*L|12
A*B*C*D*E*F*G*H*I*J*K|11
A*B*C*D*E*F*G*H*I*J|10
A*B*C*D*E*F*G*H*I|9
A*B*C*D*E*F*G*H|8
A*B*C*D*E*F*G|7
A*B*C*D*E*F|6
A*B*C*D*E|5
A*B*C*D|4
A*B*C|3
A*B|2
A|1
1*2*3*4*5*6*7*8*9*10|36
1*2*3*4*5*6*7*8*9|35
1*2*3*4*5*6*7*8|34
1*2*3*4*5*6*7|33
1*2*3*4*5*6|32
1*2*3*4*5|31
1*2*3*4|30
1*2*3|29
1*2|28
1|27

The file contains two sets of data; the alphabet (A-Z) and a numeric set (1-10). As we are including incremental records we not only see a chain of A to Z and a chain 1 to 10, but the intermediate stages of those chains. With each row returned, we get the chain record and the id of the last child. If you compare the rows above to the original file, the id that is returned is the id (number) on the left.

If showIncrementalRecords is set to "false" then below will be the result...

A*B*C*D*E*F*G*H*I*J*K*L*M*N*O*P*Q*R*S*T*U*V*W*X*Y*Z|26
1*2*3*4*5*6*7*8*9*10|36

 

 

A copy of the completed tutorial code can be found here. The example file can be downloaded here. This tutorial was built using Talend 5.5.1 but can be imported into subsequent versions. It cannot be imported into earlier versions, so you will either need to upgrade or recreate it following the tutorial.

If you have any questions about this tutorial, please post them in the comments below so that others (who will inevitably have similar problems) can benefit from the discussion.

Tutorial: 
Talend Version: 
Type of content: