Skip to main content
Skip to main content

Integrating Amazon Glue with ClickHouse and Spark

Amazon Glue is a fully managed, serverless data integration service provided by Amazon Web Services (AWS). It simplifies the process of discovering, preparing, and transforming data for analytics, machine learning, and application development.

Installation

To integrate your Glue code with ClickHouse, you can use our official Spark connector in Glue via one of the following:

  • Installing the ClickHouse Glue connector from the AWS Marketplace (recommended).
  • Manually adding the Spark Connector's jars to your Glue job.
  1. Subscribe to the Connector

    To access the connector in your account, subscribe to the ClickHouse AWS Glue Connector from AWS Marketplace.

  2. Grant Required Permissions

    Ensure your Glue job’s IAM role has the necessary permissions, as described in the minimum privileges guide.

  3. Activate the Connector & Create a Connection

    You can activate the connector and create a connection directly by clicking this link, which opens the Glue connection creation page with key fields pre-filled. Give the connection a name, and press create (no need to provide the ClickHouse connection details at this stage).

  4. Use in Glue Job

    In your Glue job, select the Job details tab, and expend the Advanced properties window. Under the Connections section, select the connection you just created. The connector automatically injects the required JARs into the job runtime.

Glue Notebook connections config
Note

The JARs used in the Glue connector are built for Spark 3.3, Scala 2, and Python 3. Make sure to select these versions when configuring your Glue job.

Examples

import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.clickhouseScala.Native.NativeSparkRead.spark
import org.apache.spark.sql.SparkSession

import scala.collection.JavaConverters._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

object ClickHouseGlueExample {
  def main(sysArgs: Array[String]) {
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)

    val sparkSession: SparkSession = SparkSession.builder
      .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
      .config("spark.sql.catalog.clickhouse.host", "<your-clickhouse-host>")
      .config("spark.sql.catalog.clickhouse.protocol", "https")
      .config("spark.sql.catalog.clickhouse.http_port", "<your-clickhouse-port>")
      .config("spark.sql.catalog.clickhouse.user", "default")
      .config("spark.sql.catalog.clickhouse.password", "<your-password>")
      .config("spark.sql.catalog.clickhouse.database", "default")
      // for ClickHouse cloud
      .config("spark.sql.catalog.clickhouse.option.ssl", "true")
      .config("spark.sql.catalog.clickhouse.option.ssl_mode", "NONE")
      .getOrCreate

    val glueContext = new GlueContext(sparkSession.sparkContext)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)
    import sparkSession.implicits._

    val url = "s3://{path_to_cell_tower_data}/cell_towers.csv.gz"

    val schema = StructType(Seq(
      StructField("radio", StringType, nullable = false),
      StructField("mcc", IntegerType, nullable = false),
      StructField("net", IntegerType, nullable = false),
      StructField("area", IntegerType, nullable = false),
      StructField("cell", LongType, nullable = false),
      StructField("unit", IntegerType, nullable = false),
      StructField("lon", DoubleType, nullable = false),
      StructField("lat", DoubleType, nullable = false),
      StructField("range", IntegerType, nullable = false),
      StructField("samples", IntegerType, nullable = false),
      StructField("changeable", IntegerType, nullable = false),
      StructField("created", TimestampType, nullable = false),
      StructField("updated", TimestampType, nullable = false),
      StructField("averageSignal", IntegerType, nullable = false)
    ))

    val df = sparkSession.read
      .option("header", "true")
      .schema(schema)
      .csv(url)

    // Write to ClickHouse
    df.writeTo("clickhouse.default.cell_towers").append()


    // Read from ClickHouse
    val dfRead = spark.sql("select * from clickhouse.default.cell_towers")
    Job.commit()
  }
}

For more details, please visit our Spark documentation.