A performance-tuning practice: from 20 hours to 8 minutes

Aug. 14, 2017

Background

In the product environment, DCC-Release ID job takes 20+ hours to finish. The bottleneck is the fact that, for every data point, the ID job needs to talk to DCC-ID service through a HTTP request to either obtain or create an ID. It means every time the ID job runs there are millions of ID lookup requests. This proves to be challenging, even if ID service runs multiple instances and the vanish cache runs on a dedicated server with 128G RAM.

Solution

ID service already has an “**/export” endpoint for each ID type(mutation, sample, donor and specimen) respectively, so the original idea is to cache the exported data at worker nodes locally for lookups and only make the HTTP request when a new ID is required to be generated.

The first trial is to use Spark Broadcast(https://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables), which gives every node a copy of a large input dataset in an efficient manner.

However I got OutOfMemory exceptions. I checked the “mutation_ids” table, there are over 80 millions records and each record occupied 1K bytes, which means when loading mutation ID data, approximate 8G memory is needed. After putting more memory to Spark driver, I got another exception which is ConnectionTimedOut. “***/export” endpoint uses Postgresql “COPY” command to export the data. Exporting 8G data and transfering the data through HTTP to Spark driver definitely cost too much, that explains why the timeout happened.

So far, Spark broadcast works well for Donor, Sample and Specimen, but failed on Mutation. So the focus is moved to how to load the large dataset of Mutation.

Spark provides a rich set of APIs to load different kinds of data from different data sources. The second trial is to load the data directly from Postgresql through SparkContext.jdbc(…) API. Unfortunately failed again, the size of the dataset is still the root cause.

At this point, it seems like everything is stuck. I decided to think about this in a bottom-up way. The whole process, where ID job talks to ID service to obtain or create an ID for each data point, looks like a typical JOIN operation in SQL world, which means I could transform both datasets(from HDFS and Postgresql) to Spark DataFrame, and make a left join operation. If loading Mutation data directly from Postgresql won’t work, how about dumping them to HDFS, then loading from HDFS.

The third trial is to dump “mutation_ids” table to HDFS through CopyManager provided by Postgresql JDBC driver, then load the text format data through SparkContext.textFile(…). The code looks like the following:

// prepare Mutation ID data frame:
DataFrame id_df =
    sqlContext.createDataFrame(
        sparkContext.textFile(${dump_file_path}).map(...)
    ).cache()
// transform HDFS data into DataFrame
DataFrame raw_df =
      sqlContext.createDataFrame(
         input.map(...) // input is RDD
      )
// left-join data frames
String[] fields = ${join fileds}
raw_df.join(id_df, fields, "left_outer").mapPartitions(...)

The final solution is Spark broadcast for Donor, Sample and Specimen, Spark SQL DataFrame for Mutation. The time consumed in the production environment for all of the projects drops from 20 hours to 8 minutes.

Conclusion

The final performance surprises me as well, which demonstrates Spark works pretty well on large-scale dataset and its SQL optimization is also pretty good.

DataFrame or Dataset is preferred when manipulating the large-scale dataset.

Communication is very important. I got inspirations from Robert and Junjun about dumping Postgresql data. Appreciate the help.

Next Step

There are still some potential improvements we could do on ID job and ID service. For example:

ID service’s lookup interface is HTTP-based, which is not that efficient as the internal service. We could use RPC solution(say gRPC) with the support of Protocol Buffer or Thrift.

ID service is using Tomcat as HTTP server, which is the default option from Spring Boot. Although Tomcat has thread pool to handle connections, the basic servlet model is still synchronous. We could use asynchronous web framework like Netty. Futhermore, we could use asynchronous frameworks to access to Postgresql even.

Task scheduling in the ID job also could be optimized. So far in the production environment, we have over 70 projects(data sources), 4 types of task(for Donor, Sample, Specimen and Mutation respectively), which means once ID job starts about 70*4 threads contend for the Spark resources because no control of the size of the thread pool. Although our Spark cluster is powerful enough, we could do more fine-grained control.

Grant Guo, Senior Software Developer
Passionate back-end developer interested in applying distributed data computation software to domains, forever curious about new things.