Skip to content

Latest commit

 

History

History
93 lines (67 loc) · 2.69 KB

README.md

File metadata and controls

93 lines (67 loc) · 2.69 KB

package com.mycom.kafkatest.generator;

import java.util.*;

import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.*;

import org.apache.hadoop.conf.; import org.apache.hadoop.fs.;

public class HDFSKProd {

public static void main(String[] args) {

    String topic = args[1];
    int batchSize = Integer.parseInt(args[2]);
    long sleepIntervalMillis = Long.parseLong(args[3]);
    int maxTrans = Integer.parseInt(args[4]);
    
    FSDataInputStream  eventStream = null;

    Properties kafkaProps = new Properties();
    KafkaProducer producer;
    
    kafkaProps.put("bootstrap.servers", "brokerHost:9092");
    kafkaProps.put("metadata.broker.list", "brokerHost:9092");

    // This is mandatory, even though we don't send keys
    kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
    kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
    kafkaProps.put("acks", "0");

    // how many times to retry when produce request fails?
    kafkaProps.put("retries", "3");
    kafkaProps.put("linger.ms", 2);
    kafkaProps.put("batch.size", 1000);
    kafkaProps.put("queue.time", 2);

    producer = new KafkaProducer(kafkaProps);
   
    if (eventStream == null) {
      try {
          Configuration conf = new Configuration();
          ("fs.hdfs.impl.disable.cache",true);
          Path pt = new Path(args[0]);
          
          FileSystem hdfs  = FileSystem.get(conf);
          
          eventStream = hdfs.open(pt);
          
      }
      catch (IOException e){
            e.printStackTrace();;
      }
    }

    int i = 0;
    try {
       byte buffer[] = new byte[800];

       while (eventStream.read(i*800,buffer,0,800) > 0) {
           i++;
          
           ProducerRecord record = new ProducerRecord(topic, buffer );
           producer.send(record);
           
           if (i > maxTrans) {
        	   System.out.println("Exiting at i= " + i);
        	   break;
           }
           if (i % batchSize == 0) {
        	   System.out.println("Sleeping at i= " + i);
        	   try {
        		   Thread.sleep(sleepIntervalMillis);
        	   } catch(InterruptedException e) {
        		   e.printStackTrace();
        	   }
           }

       }

       eventStream.close();

       producer.close();
       } catch (IOException e) {
               e.printStackTrace();
       }
}

}