Skip to main content

Spark Connector

This connector leverages ClickHouse-specific optimizations, such as advanced partitioning and predicate pushdown, to improve query performance and data handling. The connector is based on ClickHouse's official JDBC connector, and manages its own catalog.

Before Spark 3.0, Spark lacked a built-in catalog concept, so users typically relied on external catalog systems such as Hive Metastore or AWS Glue. With these external solutions, users had to register their data source tables manually before accessing them in Spark. However, since Spark 3.0 introduced the catalog concept, Spark can now automatically discover tables by registering catalog plugins.

Spark default catalog is spark_catalog, and tables are identified by {catalog name}.{database}.{table}. With the new catalog feature, it is now possible to add and work with multiple catalogs in a single Spark application.

Requirements

  • Java 8 or 17
  • Scala 2.12 or 2.13
  • Apache Spark 3.3 or 3.4 or 3.5

Compatibility Matrix

VersionCompatible Spark VersionsClickHouse JDBC version
mainSpark 3.3, 3.4, 3.50.6.3
0.8.1Spark 3.3, 3.4, 3.50.6.3
0.8.0Spark 3.3, 3.4, 3.50.6.3
0.7.3Spark 3.3, 3.40.4.6
0.6.0Spark 3.30.3.2-patch11
0.5.0Spark 3.2, 3.30.3.2-patch11
0.4.0Spark 3.2, 3.3Not depend on
0.3.0Spark 3.2, 3.3Not depend on
0.2.1Spark 3.2Not depend on
0.1.2Spark 3.2Not depend on

Installation & Setup

For integrating ClickHouse with Spark, there are multiple installation options to suit different project setups. You can add the ClickHouse Spark connector as a dependency directly in your project’s build file (such as in pom.xml for Maven or build.sbt for SBT). Alternatively, you can put the required JAR files in your $SPARK_HOME/jars/ folder, or pass them directly as a Spark option using the --jars flag in the spark-submit command. Both approaches ensure the ClickHouse connector is available in your Spark environment.

Import as a Dependency

<dependency>
<groupId>com.clickhouse.spark</groupId>
<artifactId>clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}</artifactId>
<version>{{ stable_version }}</version>
</dependency>
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<classifier>all</classifier>
<version>{{ clickhouse_jdbc_version }}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

Add the following repository if you want to use SNAPSHOT version.

<repositories>
<repository>
<id>sonatype-oss-snapshots</id>
<name>Sonatype OSS Snapshots Repository</name>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots</url>
</repository>
</repositories>

Download The Library

The name pattern of the binary JAR is:

clickhouse-spark-runtime-${spark_binary_version}_${scala_binary_version}-${version}.jar

You can find all available released JARs in the Maven Central Repository and all daily build SNAPSHOT JARs in the Sonatype OSS Snapshots Repository.

Info

It's essential to include the clickhouse-jdbc JAR with the "all" classifier, as the connector relies on clickhouse-http and clickhouse-client —both of which are bundled in clickhouse-jdbc:all. Alternatively, you can add clickhouse-client JAR and clickhouse-http individually if you prefer not to use the full JDBC package.

In any case, ensure that the package versions are compatible according to the Compatibility Matrix.

Register The Catalog (required)

In order to access your ClickHouse tables, you must configure a new Spark catalog with the following configs:

PropertyValueDefault ValueRequired
spark.sql.catalog.<catalog_name>com.clickhouse.spark.ClickHouseCatalogN/AYes
spark.sql.catalog.<catalog_name>.host<clickhouse_host>localhostNo
spark.sql.catalog.<catalog_name>.protocolhttphttpNo
spark.sql.catalog.<catalog_name>.http_port<clickhouse_port>8123No
spark.sql.catalog.<catalog_name>.user<clickhouse_username>defaultNo
spark.sql.catalog.<catalog_name>.password<clickhouse_password>(empty string)No
spark.sql.catalog.<catalog_name>.database<database>defaultNo
spark.<catalog_name>.write.formajsonarrowNo

These settings could be set via one of the following:

  • Edit/Create spark-defaults.conf.
  • Pass the configuration to your spark-submit command (or to your spark-shell/spark-sql CLI commands).
  • Add the configuration when initiating your context.
Info

When working with ClickHouse cluster, you need to set a unique catalog name for each instance. For example:

spark.sql.catalog.clickhouse1                com.clickhouse.spark.ClickHouseCatalog
spark.sql.catalog.clickhouse1.host 10.0.0.1
spark.sql.catalog.clickhouse1.protocol https
spark.sql.catalog.clickhouse1.http_port 8443
spark.sql.catalog.clickhouse1.user default
spark.sql.catalog.clickhouse1.password
spark.sql.catalog.clickhouse1.database default
spark.sql.catalog.clickhouse1.option.ssl true

spark.sql.catalog.clickhouse2 com.clickhouse.spark.ClickHouseCatalog
spark.sql.catalog.clickhouse2.host 10.0.0.2
spark.sql.catalog.clickhouse2.protocol https
spark.sql.catalog.clickhouse2.http_port 8443
spark.sql.catalog.clickhouse2.user default
spark.sql.catalog.clickhouse2.password
spark.sql.catalog.clickhouse2.database default
spark.sql.catalog.clickhouse2.option.ssl true

That way, you would be able to access clickhouse1 table <ck_db>.<ck_table> from Spark SQL by clickhouse1.<ck_db>.<ck_table>, and access clickhouse2 table <ck_db>.<ck_table> by clickhouse2.<ck_db>.<ck_table>.

Read Data

public static void main(String[] args) {
// Create a Spark session
SparkSession spark = SparkSession.builder()
.appName("example")
.master("local[*]")
.config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
.config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
.config("spark.sql.catalog.clickhouse.protocol", "http")
.config("spark.sql.catalog.clickhouse.http_port", "8123")
.config("spark.sql.catalog.clickhouse.user", "default")
.config("spark.sql.catalog.clickhouse.password", "123456")
.config("spark.sql.catalog.clickhouse.database", "default")
.config("spark.clickhouse.write.format", "json")
.getOrCreate();

Dataset<Row> df = spark.sql("select * from clickhouse.default.example_table");

df.show();

spark.stop();
}

Write Data

 public static void main(String[] args) throws AnalysisException {

// Create a Spark session
SparkSession spark = SparkSession.builder()
.appName("example")
.master("local[*]")
.config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
.config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
.config("spark.sql.catalog.clickhouse.protocol", "http")
.config("spark.sql.catalog.clickhouse.http_port", "8123")
.config("spark.sql.catalog.clickhouse.user", "default")
.config("spark.sql.catalog.clickhouse.password", "123456")
.config("spark.sql.catalog.clickhouse.database", "default")
.config("spark.clickhouse.write.format", "json")
.getOrCreate();

// Define the schema for the DataFrame
StructType schema = new StructType(new StructField[]{
DataTypes.createStructField("id", DataTypes.IntegerType, false),
DataTypes.createStructField("name", DataTypes.StringType, false),
});


List<Row> data = Arrays.asList(
RowFactory.create(1, "Alice"),
RowFactory.create(2, "Bob")
);

// Create a DataFrame
Dataset<Row> df = spark.createDataFrame(data, schema);

df.writeTo("clickhouse.default.example_table").append();

spark.stop();
}

DDL Operations

You can perform DDL operations on your ClickHouse instance using SparkSQL, with all changes immediately persisted in ClickHouse. SparkSQL allows you to write queries exactly as you would in ClickHouse, so you can directly execute commands such as CREATE TABLE, TRUNCATE, and more - without modification, for instance:


use clickhouse;

CREATE TABLE test_db.tbl_sql (
create_time TIMESTAMP NOT NULL,
m INT NOT NULL COMMENT 'part key',
id BIGINT NOT NULL COMMENT 'sort key',
value STRING
) USING ClickHouse
PARTITIONED BY (m)
TBLPROPERTIES (
engine = 'MergeTree()',
order_by = 'id',
settings.index_granularity = 8192
);

The above examples demonstrate SparkSQL queries, which you can run within your application using any API—Java, Scala, PySpark, or shell.

Configurations

The following are the adjustable configurations available in the connector:


KeyDefaultDescriptionSince
spark.clickhouse.ignoreUnsupportedTransformfalseClickHouse supports using complex expressions as sharding keys or partition values, e.g. cityHash64(col_1, col_2), and those can not be supported by Spark now. If true, ignore the unsupported expressions, otherwise fail fast w/ an exception. Note, when spark.clickhouse.write.distributed.convertLocal is enabled, ignore unsupported sharding keys may corrupt the data.0.4.0
spark.clickhouse.read.compression.codeclz4The codec used to decompress data for reading. Supported codecs: none, lz4.0.5.0
spark.clickhouse.read.distributed.convertLocaltrueWhen reading Distributed table, read local table instead of itself. If true, ignore spark.clickhouse.read.distributed.useClusterNodes.0.1.0
spark.clickhouse.read.fixedStringAsbinaryRead ClickHouse FixedString type as the specified Spark data type. Supported types: binary, string0.8.0
spark.clickhouse.read.formatjsonSerialize format for reading. Supported formats: json, binary0.6.0
spark.clickhouse.read.runtimeFilter.enabledfalseEnable runtime filter for reading.0.8.0
spark.clickhouse.read.splitByPartitionIdtrueIf true, construct input partition filter by virtual column _partition_id, instead of partition value. There are known bugs to assemble SQL predication by partition value. This feature requires ClickHouse Server v21.6+0.4.0
spark.clickhouse.useNullableQuerySchemafalseIf true, mark all the fields of the query schema as nullable when executing CREATE/REPLACE TABLE ... AS SELECT ... on creating the table. Note, this configuration requires SPARK-43390(available in Spark 3.5), w/o this patch, it always acts as true.0.8.0
spark.clickhouse.write.batchSize10000The number of records per batch on writing to ClickHouse.0.1.0
spark.clickhouse.write.compression.codeclz4The codec used to compress data for writing. Supported codecs: none, lz4.0.3.0
spark.clickhouse.write.distributed.convertLocalfalseWhen writing Distributed table, write local table instead of itself. If true, ignore spark.clickhouse.write.distributed.useClusterNodes.0.1.0
spark.clickhouse.write.distributed.useClusterNodestrueWrite to all nodes of cluster when writing Distributed table.0.1.0
spark.clickhouse.write.formatarrowSerialize format for writing. Supported formats: json, arrow0.4.0
spark.clickhouse.write.localSortByKeytrueIf true, do local sort by sort keys before writing.0.3.0
spark.clickhouse.write.localSortByPartitionvalue of spark.clickhouse.write.repartitionByPartitionIf true, do local sort by partition before writing. If not set, it equals to spark.clickhouse.write.repartitionByPartition.0.3.0
spark.clickhouse.write.maxRetry3The maximum number of write we will retry for a single batch write failed with retryable codes.0.1.0
spark.clickhouse.write.repartitionByPartitiontrueWhether to repartition data by ClickHouse partition keys to meet the distributions of ClickHouse table before writing.0.3.0
spark.clickhouse.write.repartitionNum0Repartition data to meet the distributions of ClickHouse table is required before writing, use this conf to specific the repartition number, value less than 1 mean no requirement.0.1.0
spark.clickhouse.write.repartitionStrictlyfalseIf true, Spark will strictly distribute incoming records across partitions to satisfy the required distribution before passing the records to the data source table on write. Otherwise, Spark may apply certain optimizations to speed up the query but break the distribution requirement. Note, this configuration requires SPARK-37523(available in Spark 3.4), w/o this patch, it always acts as true.0.3.0
spark.clickhouse.write.retryInterval10sThe interval in seconds between write retry.0.1.0
spark.clickhouse.write.retryableErrorCodes241The retryable error codes returned by ClickHouse server when write failing.0.1.0

Supported Data Types

This section outlines the mapping of data types between Spark and ClickHouse. The tables below provide quick references for converting data types when reading from ClickHouse into Spark and when inserting data from Spark into ClickHouse.

Reading data from ClickHouse into Spark

ClickHouse Data TypeSpark Data TypeSupportedIs PrimitiveNotes
NothingNullTypeYes
BoolBooleanTypeYes
UInt8, Int16ShortTypeYes
Int8ByteTypeYes
UInt16,Int32IntegerTypeYes
UInt32,Int64, UInt64LongTypeYes
Int128,UInt128, Int256, UInt256DecimalType(38, 0)Yes
Float32FloatTypeYes
Float64DoubleTypeYes
String, JSON, UUID, Enum8, Enum16, IPv4, IPv6StringTypeYes
FixedStringBinaryType, StringTypeYesControlled by configuration READ_FIXED_STRING_AS
DecimalDecimalTypeYesPrecision and scale up to Decimal128
Decimal32DecimalType(9, scale)Yes
Decimal64DecimalType(18, scale)Yes
Decimal128DecimalType(38, scale)Yes
Date, Date32DateTypeYes
DateTime, DateTime32, DateTime64TimestampTypeYes
ArrayArrayTypeNoArray element type is also converted
MapMapTypeNoKeys are limited to StringType
IntervalYearYearMonthIntervalType(Year)Yes
IntervalMonthYearMonthIntervalType(Month)Yes
IntervalDay, IntervalHour, IntervalMinute, IntervalSecondDayTimeIntervalTypeNoSpecific interval type is used
Object
Nested
Tuple
Point
Polygon
MultiPolygon
Ring
IntervalQuarter
IntervalWeek
Decimal256
AggregateFunction
SimpleAggregateFunction

Inserting data from Spark into ClickHouse

Spark Data TypeClickHouse Data TypeSupportedIs PrimitiveNotes
BooleanTypeUInt8Yes
ByteTypeInt8Yes
ShortTypeInt16Yes
IntegerTypeInt32Yes
LongTypeInt64Yes
FloatTypeFloat32Yes
DoubleTypeFloat64Yes
StringTypeStringYes
VarcharTypeStringYes
CharTypeStringYes
DecimalTypeDecimal(p, s)YesPrecision and scale up to Decimal128
DateTypeDateYes
TimestampTypeDateTimeYes
ArrayType (list, tuple, or array)ArrayNoArray element type is also converted
MapTypeMapNoKeys are limited to StringType
Object
Nested

Contributing and Support

If you'd like to contribute to the project or report any issues, we welcome your input! Visit our GitHub repository to open an issue, suggest improvements, or submit a pull request. Contributions are welcome! Please check the contribution guidelines in the repository before starting. Thank you for helping improve our ClickHouse Spark connector!