Streaming processing (III): Best Spark Practice

Previous article illustrates some experience when applying Kafka on processing streaming data which is an extension of early article discussing the basic concept, setup, and integration of Kafka, Spark stream, Confluent Schema Registry, and Avro for streaming data processing. This post will devote to some best practices on Spark Streaming operations (e.g., transform, Broadcast Variables), serialization and deserialization, and unit test for Spark streaming processing.

Related articles

  1. Streaming processing (III): Best Spark Practice
  2. Streaming processing (II): Best Kafka Practice
  3. Streaming processing (I): Kafka, Spark, Avro Integration

Table of content

Package and versions

  1. The following packages need to be prepared.

    Packages Version Repository
    mvn 3.3.9  
    gradle 3.3  
    flume 1.8.0
    spark 1.6.2
    schemaregistry 3.1.2
  2. They can be compiled and built via

    • maven

      mvn package -Dmaven.test.skip=true
    • gradle

      gradle build -x test

Spark Stream

  1. On a conceptual level, the following Spark Stream example will

    • Read data stream from a Kafka topic. The stream will be in binary or byte array format before deserialization and decoding.
    • Deserialize and decode the stream using kryo class.
    • Generate a stream of RDD of GenericRecord.
    • Process this RDD stream with transform operation.
    • Generate and serialize an output Kafka stream of <String, GenericRecord> using Kafka producer.
  2. All Spark Stream operations will be implemented as a separate class to make a clean code. As an alternative, one can always write these operations as nested function calls.


Github is still a good place for code base. In particular, Kafka part can be found from my Github.

  1. Spark configuration with kryo as deserializer. As data to be received is generated by the Kafka streaming processing steps described in the previous articles which are essentially a stream of <String, GenericRecord>. As a result, kryo is required in Spark to make deserialization happen.

    SparkConf sparkConf = new SparkConf()
                new Class<?>[]{
  2. Add parameters to property variables. In particular, zookkeeper url and registry url need to be specified.

    Map<String, String> kafkaParams = new HashMap<>();
    kafkaParams.put("zookeeper.connect", zookeeperURL);
    kafkaParams.put("schema.registry.url", registryURL);
    kafkaParams.put("", groupName);
    Properties props = new Properties();
    props.put("bootstrap.servers", bootstrapURL);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
  3. Receive a stream of RDD<String, GenericRecord> from Kafka stream.

    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(batchSize));
    Map<String, Integer> topicMap = new HashMap<>();
    for (String topic : topicIn.split(",")) {
        topicMap.put(topic, numThreads);
    // create key-value stream
    JavaPairReceiverInputDStream<String, GenericRecord> streamMsg = KafkaUtils.createStream(
  4. Convert the input stream from RDD<String, GenericRecord> to RDD<GenericRecord>. A Spark mapper is needed to map from tuple<key,value> to tuple<value>.

    // get record stream
    JavaDStream<GenericRecord> avroInMsg =
            new Function<Tuple2<String, GenericRecord>,GenericRecord >(){
                public GenericRecord call(Tuple2<String, GenericRecord> tuple2) throws Exception{
                    return tuple2._2();
  5. Process the stream of RDD<GenericRecord> and generate output Kafka stream of <String, GenericRecord>.

    if (operation == ProcessloguserP)       {
        avroOutMsg = avroInMsg.transform( new ProcessLogToP() );
        avroOutMsg.foreachRDD( new ProduceOutputStream(topicOut, schemaNameOut, producerP) );
    } else if (operation == ProcessloguserC)       {
        avroOutMsg = avroInMsg.transform( new ProcessLogToC() );
        avroOutMsg.foreachRDD( new ProduceOutputStream(topicOut, schemaNameOut, producerC) );
    • Implement Spark transformation operation

      public class ProcessLogToP implements Function<JavaRDD<GenericRecord>, JavaRDD<GenericRecord>> {
          private static final long serialVersionUID = 1L;
          public JavaRDD<GenericRecord> call(JavaRDD<GenericRecord> rddIn) throws Exception{
              final Broadcast<Map<String,Schema>> schemaList = BCVariables.getInstance(new JavaSparkContext(rddIn.context()));
              JavaRDD<GenericRecord> rddOut =
                      new Function<GenericRecord, GenericRecord>(){
                          public GenericRecord call (GenericRecord input) throws Exception{
                          	Schema schema = schemaList.value().get("P");
                              GenericData.Record output = new GenericData.Record(schema);
                              for (Schema.Field field : schema.getFields()) {
                              return output;             
              return rddOut;
    • Generate Kafka stream of <String, GenericRecord>. We use Twitter’s Bijection package. We can also use Avro Serde to serialize the message directly.

      public class ProduceOutputStream implements Function2<JavaRDD<GenericRecord>, Time, Void> {
          private String topicOut;
          private String schemaNameOut; 
          private KafkaProducer<String, byte[]> producer;
          public ProduceOutputStream(String topicOut, String schemaNameOut, KafkaProducer<String, byte[]> producer){
              this.topicOut      = topicOut;
              this.schemaNameOut = schemaNameOut;
              this.producer      = producer;
          public Void call(JavaRDD<GenericRecord> rdd, Time time) throws Exception{
              long startTime = 0;
              byte[] bytes   = null;
              ProducerRecord<String, byte[]> data = null;
              List<GenericRecord> records = null;
              Broadcast<Map<String,Schema>> schemaList = BCVariables.getInstance(new JavaSparkContext(rdd.context()));
              Injection<GenericRecord, byte[]> outInjection = GenericAvroCodecs.toBinary(schemaList.value().get(schemaNameOut));
              if (rdd != null){
                  records = rdd.collect();
                  for (GenericRecord record : records){
                      bytes = outInjection.apply(record);
                      data = new ProducerRecord<>(topicOut, bytes);
                      startTime = System.currentTimeMillis();
                      producer.send(data, new KafkaProducerCallback(startTime));
                  System.out.println("----- Message processed: " + rdd.count());
              return null;
    • Variables should be passed to each mapper using the Broadcast principle. In particular, schema definition is passed to mappers, therefore, we should use CachedSchemaRegistryClient to contact and register schema to Registry server when applicable.

      public class BCVariables{
          private static volatile Broadcast<Map<String,Schema>> instance = null;
          public static Broadcast<Map<String,Schema>> getInstance(JavaSparkContext jsc){
              Map<String, Schema> ficoSchemaList = new HashMap<>(); 
              int schemaId;
              String registryURL   = "http://localhost:8081";
              Schema.Parser parser = new Schema.Parser();
              CachedSchemaRegistryClient client = new CachedSchemaRegistryClient(registryURL,20);
              if(instance == null){
                  synchronized (BCVariables.class){
                      if (instance == null){
                          Schema SchemaP     = null;
                          Schema SchemaC     = null;
                              schemaId = client.getLatestSchemaMetadata("P").getId();
                              SchemaP = client.getByID(schemaId);
                          }catch (Exception ex){
                              SchemaP     = parser.parse(SchemaDef.AVRO_SCHEMA_P   );
                                  schemaId = client.register("P",SchemaP);
                              }catch(Exception e){
                              schemaId = client.getLatestSchemaMetadata("C").getId();
                              SchemaC = client.getByID(schemaId);
                          }catch (Exception ex){
                              SchemaC     = parser.parse(SchemaDef.AVRO_SCHEMA_C   );
                                  schemaId = client.register("C",SchemaC);
                              }catch(Exception e){
                          ficoSchemaList.put("P",    SchemaP);
                          ficoSchemaList.put("C",    SchemaC);
                          instance = jsc.broadcast(ficoSchemaList);
              return instance;

Unit test

The example is based on running a unit test for Spark Stream transform operation. Other operation can be tested in the similar fashion.

  1. Setup test by defining Spark context.

    public void setUp() {
        conf = new SparkConf().setAppName("test").setMaster("local[*]");
        ssc  = new JavaStreamingContext(conf, new Duration(1000L));
  2. Shutdown test properly by closing the Spark context. This will allow running multiple unit test during a test cycle.

    public void tearDown() {
  3. The actual test illstrated here is nothing more than a template. Actual testing cases still need to be filled.

    • Setup testing cases

      Schema.Parser parser = new Schema.Parser();
      Schema schema = parser.parse(SchemaDef.AVRO_SCHEMA_OUTLog);
      GenericRecord record = new GenericData.Record(schema);
      for (Schema.Field field : schema.getFields()) {
          record.put(, "1");
      List<GenericRecord> inputList = new ArrayList<>();
      Queue<JavaRDD<GenericRecord>> rddQueue = new LinkedList<>();
      JavaDStream<GenericRecord> inputStream = ssc.queueStream(rddQueue);
    • Execute the operation under test which is ProcessLogToP() on the input data just generated and retrive the results.

      JavaDStream<GenericRecord> outputStream = inputStream.transform(new ProcessLogToP());
    • Compare the actual resules with the expected results using assertion.


We have discussed some best practices connecting Spark Stream to Kafka KStream, applying Spark Stream operation e.g. transform, and sending out Kafka stream out from Spark streams.

Hongyu Su 26 February 2017 Helsinki