Skip to content

Ethereum Spark Connector

Ashish Shukla edited this page Sep 1, 2018 · 3 revisions

This library will let you -

  • expose ethereum tables provided by eth-jdbc-driver as Spark RDDs.
  • Query over large number of block range in distributed environment.
  • execute arbitrary SQL queries in your Spark applications.

Building ethereum-spark-connector

  • Download sourcecode or use git clone https://github.com/Impetus/eth-jdbc-connector.git
  • build it using mvn clean install

Getting Started

To use Ethereum Spark connector in a maven project, add the following maven dependency in your project:

<dependency>
  <groupId>com.impetus.eth</groupId>
  <artifactId>eth-spark-connector</artifactId>
  <version>${ethsparkconnector.version}</version>
</dependency>

Creating the RDD and Data Frame with eth-spark-connector

To get etheruem table in spark you need to follow these steps:

  1. Create the ReadConf object
    import org.apache.spark.sql.eth.EthSpark.implicits._
    
    val readConf = ReadConf(<splitCount>, <fetchSizeInRows>, "select * from block")
  • First argument is splitCount(number of partition). For example Some(10) or None
  • Second argument is fetchSizeInRows per partition. For example Some(1000) or None
  • Third argument is sql query. Give where clause for smaller output.
  • "org.apache.spark.sql.eth.EthSpark.implicits" for DefaultPartition implicits

Note: Use simple eth jdbc query with where clause and then use RDD/dataframe group by or some other aggregation features for optimized outcome.

  1. Load Rdd with EthSpark Or use spark session to load data frame

    //Create RDD
    val rdd = EthSpark.load[Row](<sparkContext>, readConf, Map("url" -> "jdbc:blkchn:ethereum://ropsten.infura.io/1234"))
    //Create DataFrame
    val option = readConf.asOptions() ++ Map("url" -> "jdbc:blkchn:ethereum://ropsten.infura.io/1234")
    val df = spark.read.format("org.apache.spark.sql.eth").options(option).load()    

Saving Data Frame to initiate insert transaction

This will provide, sender the flexibility to initiate multiple transaction in a single call by creating and saving Data Frame. Data Frame should have four values- first for address, second for ether value to transfer, third for unit and fourth for async.

 var dataFrameToStore = spark.createDataFrame(Seq(
           ("<address>", <ethers>, <unit>, <async>),
           ("<address>", <ethers>, <unit>, <async>)
         )).toDF("toaddress", "value", "unit", "async")
         
 EthSpark.save(dataFrameToStore, Map(
           "url" -> "jdbc:blkchn:ethereum://127.0.0.1:8545",
           "KEYSTORE_PATH" -> "<Path To Keystore>",
           "KEYSTORE_PASSWORD" -> "<password>"))