Kafka

Apache Kafka enables Incorta to consume messages from the No-SQL MongoDB database. This article will guide you through the prerequisite steps that must be completed before adding Kafka as a data source and the parameters that need to be defined during the connection set-up process.

Prerequisites

  1. Install latest Incorta build. Visit Install and Upgrade Incorta for instructions.
  2. An up-and-running Zookeeper. Kafka is built on top of the ZooKeeper synchronization service. See Set up a Zookeeper Cluster for instructions.
  3. Kafka broker with an available topic. Get started with Kafka here.
  4. A sample JSON file of the message stored on the Kafka topic.
  5. A valid avro file using the Avro Extractor tool. The Avro Extractor is an external tool that can be used to generate an Avro file from sample JSON messages to create a schema readable by the Incorta application.

Kafka Parameters

After you choose Kafka as a data source, the New Data Source dialog will request these parameters:

  • Data Source Name
  • Topic: A category or feed name to which records are published.
  • Brokers List: A cluster of one or more servers.
  • messageType Field:

    • Uses the key-value of the root table name for messages.
    • For example, if the key in the JSON message is {"entity":"Employee"}, enter "-messageType entity" for the schema to be named after the key-value, "Employee".
    • The JSON text must be wrapped in braces: {"name":"value"}.
  • Trim messageType after dash (optional): Uses the Boolean data type (true or false) to prevent the use of unsupported characters in the messageType. Default value is false.
  • Kafka Version: Select Kafka version from the dropdown menu.
  • Enable Kafka Consumer:

    • If enabled, the consumer process associated with the data source will be turned on and bring in messages.
    • If disabled, the consumer process will be turned off and no messages will be consumed.
  • Data Expiration (optional): Sets expiration date by number of days.
  • Mapping File: An .avro file that is generated using the avroExtractor.jar tool shipped in the bin directory under the <INSTALLATION_PATH>.

Create a Sample JSON Message

  1. As the Incorta user, in the terminal, in the /tmp directory,create the fruitdata.json with vi.

    sudo su incorta
    cd /tmp
    vi fruitdata.json
  2. In vi, copy the following to the file (i= insert mode).

    { "messageId": 143, "entity": "tblFruitSales", "data": { "id": 143, "dt": "2019-09-30 19:28:23", "item": "jackfruit", "quantity": 35, "price": 5.72 } }

    Important: Remember to wrap the JSON text.

  3. Save your changes (esc= read mode)(quit= :wq!).
  4. Confirm that you can read fruitdata.json in terminal. cat fruitdata.json

Create .avro File with Avro Extractor

The Avro Extractor is an external tool that can be used to generate an Avro file from sample JSON messages to create a schema readable by the Incorta application.

To create an avro file:

  1. Copy the javax.json.jar to the IncortaNode/bin directory

    cp /home/incorta/IncortaAnalytics/IncortaNode/runtime/lib/javax.json-1.0.4.jar  /home/incorta/IncortaAnalytics/IncortaNode/bin/
  2. Verify that the .jar file is copied:

    ls -l  /home/incorta/IncortaAnalytics/IncortaNode/bin/ | grep javax
  3. Navigate to the IncortaNode/bin directory. cd /home/incorta/IncortaAnalytics/IncortaNode/bin/
  4. Run avroExtractor.jar (the Avro Extractor tool).

    java -jar avroExtractor.jar -messageType "entity" -input /tmp/fruitdata.json
  5. Review the output message:

    Extracting the schema for: /tmp/fruitdata.json
    output to: /tmp/fruitdata.json.avro
    messageType key is: entity
    trimAfterDash: false

    The output will be a JSON file containing the Avro schema describing the source message, fruitdata.avro. Make note of this file location in order to import later into Incorta.

Review and Modify the .avro File

  1. Open the .avro file in vi in the terminal vi /tmp/fruitdata.json.avro
  2. Add an annotation for the id field as a key (i= insert mode) { "path": "tblFruitSales.id", "isKey": "true" }
  3. Save your changes and close vi. (esc= read mode)(quit= :wq!)

Copy .avro File Content

  1. Open the file again with vi.
  2. Copy the contents of the file using a text editor on your desktop.
  3. Save the file as fruitdata.json.avro

Kafka Annotations

In the generated Avro file, you may add annotations to ask Incorta to modify column or table definitions. For example, you could modify these definitions to ignore a table, set certain columns as keys, or identify maps.

The following table includes a full list of the supported annotations:

Name Syntax Description Example
1 Primary Key isKey Sets the given field/column to be part of the primary key of its parent table. This flag is set on the field level. {"path": "employee.dsid", "isKey":"true"}
2 Set as table isTable The default behavior of Incorta is to flatten the fields of nested child JSON objects inside its parent. If it is needed to have a nested child being a separate table, the "isTable" flag should be set to "true". {"path":"employee.qualifications", "isTable":"true"}
3 Parent table isParent A nested child object in JSON is a child table in Incorta by default. However, if the table should be a parent table, the isParent" flag should be set to "true". Note that "isParent" must be used in conjunction with "isTable" set to "true". {"path":"employee.organization", "isTable":"true", "isParent":"true"}
4 Persistent isPersistent Set it to "false" if the data for a JSON nested object will be sent via another Kafka message and it does not have to be in the current message. Note "isPersistent" must be used in conjunction with “isTable”. {"path":"employee.organization", "isTable":"true", "isParent":"true", "isPersistent":"false"}
5 Map isMap If a nested JSON object is a map between the keys in the JSON and a set of child records, then it should be a child table and the isMap should be flagged to "true". Note that "isMap" cannot be used in conjunction with "isTable". {"path":"employee.addresses", "isMap":"true"}
6 Map Key < KEY > In the “path”, when referencing fields or records that are child of a map, then the path will include a variable element that will be referenced via the name < KEY >. {"path":"employee.addresses. < KEY >.typeName", "isKey":"true"}.
7 Array Map tableName If a map is a set of fields inside a record, then the annotations will need to specify the names of these fields, along with the corresponding table name because, as it would not have a name in the JSON sample. A list of the specified field names is specified inside the "path" and comma-separated. Note that the tableName must be used in conjunction with isMap set to "true". {"path":"employee.addresses. < KEY > .local1, local2", "isMap":"employee.addresses.LocalAddresses"}.
8 Ignored isIgnored Any table record marked as "isIgnored" will not be shown in discovery. {"path":"employee.correction", "isIgnored":"true"}
9 Table Name TableName An alias table name can be added to any table, map, or array. {"path":"country_lastModification", "isTable":"true", "tableName":"coun_last_mod"}
10 One-To-One isOneToOne When annotating a table as "isTable", you can annotate it as one-to-one table. Accordingly, Incorta will not mark more columns as PRIMARY_KEY other than those that were inherited from the parent table. {"path":"employee.demographic", "isTable":"true", "isOneToOne":"true"}
11 Source encrypted isEncrypted Annotating a field as "isEncrypted" means that it is encrypted from the source and needs to be decrypted by Incorta using custom crypto class. {"path":"employee.back.firstName", "isEncrypted":"true"}.
12 Encryption name encryptionName This annotation should be following "isEncrypted":"true". The value of this annotation is the crypto name that should be configured in the kafka-crypto.properties file. {"path":"employee.basic.firstName", "isEncrypted":"true", "encryptionName": "person"}
© Incorta, Inc. All Rights Reserved.