Spark Best Practices

This section describes the best practices for using Spark for materialized views in Incorta.

Pre-Requisites

In order to use materialized views with Incorta, you need to have the Spark server installed. By default, an Incorta Node in the Incorta Unified Data Analytics Platform bundles Apache Spark as part of its installation. As a result, you have two options:

  1. Use Incorta’s bundled Spark, in which case Apache Spark will be installed on the same machine as Incorta, and it is not recommended.

If you start the bundled Spark using Incorta’s launching scripts, startSpark.sh/stopSpark.sh), this will result in starting a Spark cluster on the same machine as Incorta.

  1. Use a separate Spark installation.

Whether you use the bundled Spark, or a separate Spark, you may direct multiple Incorta instances to use that same Spark installation. Please refer to the following table for a list of pros and cons for this configuration option:

Pros:

  • Only one Spark instance to maintain. The host machine must have enough resources to handle all the incoming requests.
  • Running Spark and Incorta on the same machine makes it easy to satisfy the need of sharing the same disk.

Cons:

  • A high risk of running a high load due to too many incoming requests for the available machine resources. This can lead to out-of-memory errors and crashing the system. Resolving such issues may require a reconfiguration for Spark and Incorta on multiple/bigger machines.

Spark Server Parameters:

It is important to get familiar with some Spark-related configuration parameters. The default values of these parameters can be found in the spark-defaults.sh and spark-env.sh files, in the following directory:

<INCORTA_INSTALLATION>/spark/conf

However, it is highly recommended to make changes to these parameters at the MV level in the Data Source definition window, in Incorta’s MV table definition page as shown in the following image:

spark21

To set a certain property, that property has to be uncommented in the spark-env.sh file, with the exception of the properties listed in the Pre-Configured Properties table. Next, configure that property with a specific value to change the default for it, by clicking on Add Property.

spark4

examples for MV properties to add, these are sample values, and we can play with the values:

spark.driver.memory - 8G
spark.executor.memory - 20G
spark.executor.cores - 2  (1-5 max)
spark.cores.max - 4
spark.sql.shuffle.partitions - 500

The following tables contain lists of all the available parameters (along with their description) to use as a reference when configuring specific values:

Pre-Configured Properties

Parameter Default Value
SPARK_HISTORY_OPTS -Dspark.history.fs.logDirectory=SPARK_HISTORY_OPTS=<INCORTA_INSTALLATION>/spark/eventlogs
SPARK_PUBLIC_DNS <MACHINE_NAME>.local
SPARK_MASTER_IP <MACHINE_NAME>.local
SPARK_MASTER_PORT 7077
SPARK_MASTER_WEBUI_PORT 9091
SPARK_WORKER_PORT 7078
SPARK_WORKER_WEBUI_PORT 9092
SPARK_WORKER_MEMORY 4g

Generic options for the daemons used in the standalone deploy mode

Parameter Description Default Value
SPARK_CONF_DIR Alternate conf dir. ${SPARK_HOME}/conf
SPARK_LOG_DIR Where log files are stored. ${SPARK_HOME}/logs
SPARK_PID_DIR Where the pid file is stored. /tmp
SPARK_IDENT_STRING A string representing this instance of spark. $USER
SPARK_NICENESS The scheduling priority for daemons. 0
SPARK_NO_DAEMONIZE Run the proposed command in the foreground. It will not output a PID file. N/A

Best Practices

The following are recommended best practices for configuring a Spark environment for running MV jobs in Incorta:

  • Balance the resources assigned to SQL Apps and MV. if you are not using SQL Apps, assign zero core and disable it.
  • Ensure sufficient resources are allocated to run the materialized views vs Incorta when Spark and Incorta are running in the same host.
  • On the Incorta machine, ensure you have sufficient resources left for Spark MV driver process
  • Maximize the overall throughput by ensuring the MV jobs can run in parallel. This typically means giving proper # of cores, just enough for running the individual job. For executor memory, consider garbage collection and don’t give it more than enough.
  • Tune the jobs that require more resources first.
  • We need to first make sure the task can finish, then make it run faster.
  • We need to tune Compaction as well. Spill to disk should be avoided since it increases the duration of Compaction.
  • Number of executors vs number of cores for each executor. Some considerations: Memory is not shared across executors.
  • Number of shuffle partitions. We typically match the number of shuffle with the number of cores. When the Spark process keep running out of memory, we may try to increase it to more than 2001 to ensure the job finishes.

Best Practices:

  • Use the language as SQL if there is only query for which you need to create a MV. For other complex usecase you can select Python as the language.
  • Rather than denormalizing the dimensional attributes in the MV itself try creating a MV which has id and metric fields so that the id fields can be joined to the keys of the parent tables for resolving the dimensional attributes
  • Incremental logic can be supported in a MV if a proper key can be defined on it.
  • We can use the ? inside the MV sql, ? means last successful transformation timestamp value of MV. This is similar to ? used inside our regular SQL Queries. The value is stored in a long format representing a timestamp value. Eg
    Select * FROM incorta_metadata.DASHBOARD WHERE CREATIONDATE > ?

The most accurate way to run incremental materialized views is to select the max date from the MV itself to get the incremented data accurately.

  • In an Incremental MV please make sure that field order, field names (they are case sensitive) and field datatypes are the same in the full and incremental part of the code. If the datatypes are not the same then use the CAST function, for eg Timestamp to Date: CAST(thets AS DATE) AS thedate
  • One more way to do incremental logic in a MV where we always want to refresh the last 30 days worth of data -
  • Filtering je_lines by creation date from pyspark.sql.functions import lit, when, col, coalesce from datetime import datetime, timedelta
df_GL_JE_LINES = read("EBS_GL.GL_JE_LINES")
d = datetime.today() - timedelta(days=30)
df_GL_JE_LINES = df_GL_JE_LINES.filter(df_GL_JE_LINES.EFFECTIVE_DATE > lit(d)

Here is a sample script:

  • Shows how to create a MV using dataframes and joining them via SQL, base tables are in an Incorta schema called TEST and should have been loaded:
from pyspark.sql import functions as F
df_customers = read("TEST.customers")
df_customers.createOrReplaceTempView("customers_table")
df_submitters = read("TEST.submitters")
df_submitters.createOrReplaceTempView("submitters_table")
df = spark.sql("""
 SELECT DISTINCT a.submitter_id,
    a.submitter_name,
    a.submitter_type,
    coalesce(upper(b.revenuegroupname), upper(a.customer_name)) AS
submitter_finance_name,
    coalesce(b.revenuegroupnumber, a.customer_agn) AS
submitter_financials_customer_id,
    a.vendor_id,
    a.vendor_name
 FROM submitters_table a
 LEFT OUTER JOIN customers_table b ON a.customer_agn=b.customerid
 WHERE lower(a.source_name)='test'
""")
save(df)
  • Example of an Incremental MV

incremental MV example import pyspark.sql.functions as F from datetime import datetime, timedelta from pyspark.sql.functions import lit, when, col, coalesce

df_TRANSACTION_LINES_SOURCE  = read("transactions.TRANSACTION_LINES_SOURCE")
df_ITEMS  = read("items.ITEMS")
#Filtering by 7 days
d = datetime.today() - timedelta(days=7)
df_TRANSACTION_LINES_SOURCE = df_TRANSACTION_LINES_SOURCE.filter(df_TRANSACTION_LINES_SOURCE.DATE_MODIFIED > lit(d))

df_TRANSACTION_LINES_SOURCE.createOrReplaceTempView("TRANSACTION_LINES_SOURCE")
df_ITEMS.createOrReplaceTempView("ITEMS")

df1 = spark.sql("""
SELECT ts.ACCOUNT_ID,
  ts.AMOUNT,
  ts.AMOUNT_TAXED,
  ts.COMPLETE_0,
  CAST(ts.DATE_ARRIVE AS TIMESTAMP) + INTERVAL '8' HOUR AS DATE_ARRIVE,
  COALESCE(ts.BRAND_ID, it.BRAND_ID)        AS CALCULATED_BRAND_ID
FROM TRANSACTION_LINES_SOURCE AS ts
LEFT OUTER JOIN ITEMS it
ON ts.ITEM_ID        = it.ITEM_ID
WHERE 1=1
    """)
save(df1)

Following are some sample MV properties which we can add for performance, for eg this config uses total of 25G * 2 = 50G ram, we can tweak each one of them based on how spark has been configured and the amount of free RAM:

spark.driver.memory - 4G spark.executor.memory - 25G spark.executor.cores - 2 spark.cores.max - 2 spark.sql.shuffle.partitions - 200

spark20


© Incorta, Inc. All Rights Reserved.