Kafka
Apache Kafka enables Incorta to consume messages from the No-SQL MongoDB database. This article will guide you through the prerequisite steps that must be completed before adding Kafka as a data source and the parameters that need to be defined during the connection set-up process.
Prerequisites
- Install latest Incorta build. Visit Install and Upgrade Incorta for instructions.
- An up-and-running Zookeeper. Kafka is built on top of the ZooKeeper synchronization service. See Set up a Zookeeper Cluster for instructions.
- Kafka broker with an available topic. Get started with Kafka here.
- A sample JSON file of the message stored on the Kafka topic.
- A valid avro file using the Avro Extractor tool. The Avro Extractor is an external tool that can be used to generate an Avro file from sample JSON messages to create a schema readable by the Incorta application.
Kafka Parameters
After you choose Kafka as a data source, the New Data Source dialog will request these parameters:
- Data Source Name
- Topic: A category or feed name to which records are published.
- Brokers List: A cluster of one or more servers.
-
messageType Field:
- Uses the key-value of the root table name for messages.
- For example, if the key in the JSON message is
{"entity":"Employee"}
, enter"-messageType entity"
for the schema to be named after the key-value,"Employee"
. - The JSON text must be wrapped in braces:
{"name":"value"}
.
- Trim messageType after dash (optional): Uses the Boolean data type (true or false) to prevent the use of unsupported characters in the messageType. Default value is false.
- Kafka Version: Select Kafka version from the dropdown menu.
-
Enable Kafka Consumer:
- If enabled, the consumer process associated with the data source will be turned on and bring in messages.
- If disabled, the consumer process will be turned off and no messages will be consumed.
- Data Expiration (optional): Sets expiration date by number of days.
- Mapping File: An .avro file that is generated using the
avroExtractor.jar
tool shipped in the bin directory under the <INSTALLATION_PATH>.
Create a Sample JSON Message
-
As the Incorta user, in the terminal, in the
/tmp
directory,create thefruitdata.json
with vi.sudo su incorta cd /tmp vi fruitdata.json
-
In vi, copy the following to the file (i= insert mode).
{ "messageId": 143, "entity": "tblFruitSales", "data": { "id": 143, "dt": "2019-09-30 19:28:23", "item": "jackfruit", "quantity": 35, "price": 5.72 } }
Important: Remember to wrap the JSON text.
- Save your changes (esc= read mode)(quit= :wq!).
- Confirm that you can read fruitdata.json in terminal.
cat fruitdata.json
Create .avro File with Avro Extractor
The Avro Extractor is an external tool that can be used to generate an Avro file from sample JSON messages to create a schema readable by the Incorta application.
To create an avro file:
-
Copy the
javax.json.jar
to theIncortaNode/bin
directorycp /home/incorta/IncortaAnalytics/IncortaNode/runtime/lib/javax.json-1.0.4.jar /home/incorta/IncortaAnalytics/IncortaNode/bin/
-
Verify that the .jar file is copied:
ls -l /home/incorta/IncortaAnalytics/IncortaNode/bin/ | grep javax
- Navigate to the IncortaNode/bin directory.
cd /home/incorta/IncortaAnalytics/IncortaNode/bin/
-
Run avroExtractor.jar (the Avro Extractor tool).
java -jar avroExtractor.jar -messageType "entity" -input /tmp/fruitdata.json
-
Review the output message:
Extracting the schema for: /tmp/fruitdata.json output to: /tmp/fruitdata.json.avro messageType key is: entity trimAfterDash: false
The output will be a JSON file containing the Avro schema describing the source message,
fruitdata.avro
. Make note of this file location in order to import later into Incorta.
Review and Modify the .avro File
- Open the .avro file in
vi
in the terminalvi /tmp/fruitdata.json.avro
- Add an annotation for the id field as a key (i= insert mode)
{ "path": "tblFruitSales.id", "isKey": "true" }
- Save your changes and close
vi
. (esc= read mode)(quit= :wq!)
Copy .avro File Content
- Open the file again with
vi
. - Copy the contents of the file using a text editor on your desktop.
- Save the file as
fruitdata.json.avro
Kafka Annotations
In the generated Avro file, you may add annotations to ask Incorta to modify column or table definitions. For example, you could modify these definitions to ignore a table, set certain columns as keys, or identify maps.
The following table includes a full list of the supported annotations:
Name | Syntax | Description | Example | |
---|---|---|---|---|
1 | Primary Key | isKey |
Sets the given field/column to be part of the primary key of its parent table. This flag is set on the field level. | {"path": "employee.dsid", "isKey":"true"} |
2 | Set as table | isTable |
The default behavior of Incorta is to flatten the fields of nested child JSON objects inside its parent. If it is needed to have a nested child being a separate table, the "isTable" flag should be set to "true" . |
{"path":"employee.qualifications", "isTable":"true"} |
3 | Parent table | isParent |
A nested child object in JSON is a child table in Incorta by default. However, if the table should be a parent table, the isParent" flag should be set to "true" . Note that "isParent" must be used in conjunction with "isTable" set to "true" . |
{"path":"employee.organization", "isTable":"true", "isParent":"true"} |
4 | Persistent | isPersistent |
Set it to "false" if the data for a JSON nested object will be sent via another Kafka message and it does not have to be in the current message. Note "isPersistent" must be used in conjunction with “isTable”. |
{"path":"employee.organization", "isTable":"true", "isParent":"true", "isPersistent":"false"} |
5 | Map | isMap |
If a nested JSON object is a map between the keys in the JSON and a set of child records, then it should be a child table and the isMap should be flagged to "true" . Note that "isMap" cannot be used in conjunction with "isTable" . |
{"path":"employee.addresses", "isMap":"true"} |
6 | Map Key | < KEY > |
In the “path”, when referencing fields or records that are child of a map, then the path will include a variable element that will be referenced via the name < KEY > . |
{"path":"employee.addresses. < KEY >.typeName", "isKey":"true"} . |
7 | Array Map | tableName |
If a map is a set of fields inside a record, then the annotations will need to specify the names of these fields, along with the corresponding table name because, as it would not have a name in the JSON sample. A list of the specified field names is specified inside the "path" and comma-separated. Note that the tableName must be used in conjunction with isMap set to "true" . |
{"path":"employee.addresses. < KEY > .local1, local2", "isMap":"employee.addresses.LocalAddresses"} . |
8 | Ignored | isIgnored |
Any table record marked as "isIgnored" will not be shown in discovery. |
{"path":"employee.correction", "isIgnored":"true"} |
9 | Table Name | TableName |
An alias table name can be added to any table, map, or array. | {"path":"country_lastModification", "isTable":"true", "tableName":"coun_last_mod"} |
10 | One-To-One | isOneToOne |
When annotating a table as "isTable" , you can annotate it as one-to-one table. Accordingly, Incorta will not mark more columns as PRIMARY_KEY other than those that were inherited from the parent table. |
{"path":"employee.demographic", "isTable":"true", "isOneToOne":"true"} |
11 | Source encrypted | isEncrypted |
Annotating a field as "isEncrypted" means that it is encrypted from the source and needs to be decrypted by Incorta using custom crypto class. |
{"path":"employee.back.firstName", "isEncrypted":"true"} . |
12 | Encryption name | encryptionName |
This annotation should be following "isEncrypted":"true" . The value of this annotation is the crypto name that should be configured in the kafka-crypto.properties file. |
{"path":"employee.basic.firstName", "isEncrypted":"true", "encryptionName": "person"} |