This week's giveaway is in the Android forum.
We're giving away four copies of Android Security Essentials Live Lessons and have Godfrey Nolan on-line!
See this thread for details.
The moose likes Java in General and the fly likes Single Producer and multiple consumer using Mulithreading Big Moose Saloon
  Search | Java FAQ | Recent Topics | Flagged Topics | Hot Topics | Zero Replies
Register / Login


Win a copy of Android Security Essentials Live Lessons this week in the Android forum!
JavaRanch » Java Forums » Java » Java in General
Reply locked New topic
Author

Single Producer and multiple consumer using Mulithreading

Harikumar Venkatesan
Greenhorn

Joined: Mar 03, 2011
Posts: 13
Hi,

I am new to coderanch forum.I have 3 years of Java exp and i never worked on threads.

Here is the high level overivew of my code that I have created.

• Create Executor service for Producer to pick up the records using Linked blocking queue..
• Once the records have been taken up by the producer, it will form like multiple array list and each one will have 100000 records.
• Using ThreadPoolExecutor, multiple Consumer Threads is getting triggered and Process the array list of records sequentially.
• All records have been processed then the loop continues to take up the upcoming records.

Issues I'm facing

-If there are 300000 records the queue has taken 100000 records in each index(configurable),my consumer code will took only 100000 records and suddenly my TPE get shutting down.

There are two class files that I have developed.
1.Reader.java
2.Producer.java

Reader.java will take a file, producer.java puts up in the Linked blocking queue.
Once queue is getting filled,using Thread pool executor i will create mulitple consumers to taken up the records for processing.

Please find the attached code .It would be great if anyone could solve my crictical issue.Forgive me if any typo is there


Sincerely,
Hari.





Harikumar Venkatesan
Greenhorn

Joined: Mar 03, 2011
Posts: 13
Reader.java
-----------------------

import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Reader {

private static String FOLDER = "C:\\temp";
private int total = 0;

public Reader() {
super();
}

public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {

String files = null;
Reader reader = new Reader();
File folder = new File(FOLDER);
File[] listOfFiles = folder.listFiles();
ExecutorService es = Executors.newSingleThreadExecutor();
LinkedBlockingQueue<ArrayList<String>> q = new LinkedBlockingQueue<ArrayList<String>>();
for (int i = 0; i < listOfFiles.length; i++) {
if (listOfFiles[i].isFile()) {
files = listOfFiles[i].getAbsolutePath();
reader.startProcess(files, es, q);
}
}
// end of main()
}

private void startProcess(String files, ExecutorService es, LinkedBlockingQueue<ArrayList<String>> q) throws IOException, InterruptedException, ExecutionException {
log("Starting the process....." + files + "..................");
Producer input = null;
//Consumer output = null;
try {
Reader reader = new Reader();
input = new Producer(q, files);
es.execute(input);
//Creating Multiple Consumers using TPE and execute it
reader.threadPoolExecutorProcess(q);

} catch (IOException e3) {
System.out.println(e3.getMessage());
return;
}
System.out.println("Running ....");
// This will make the executor accept no new threads
// and finish all existing threads in the queue
es.shutdown();
// Wait until all threads are finish
while (!es.isTerminated()) {
}
}

public void log(String str) {
System.out.println(getDate() + ":" + str);
}

public String getDate() {
return new SimpleDateFormat("yyyy-MM-dd HH.mm.ss.S").format(new java.util.Date());
}

//Creating a ThreadPoolExecutor with corePoolSize 50 (min threads) , maximum 100 threads (maxPoolSize), Maximum size of the queue 10 and a custom ThreadFactory
/*****
1) If the number of threads is less than the corePoolSize, create a new Thread to run a new task
2) If the number of threads is equal (or greater than) the corePoolSize, put the task into the queue
3) If the queue is filled, and the number of threads is less than the maxPoolSize, create a new thread to run tasks in.
4) If the queue is filled, and the number of threads is greater than or equal to maxPoolSize, reject the task.
******/
private void threadPoolExecutorProcess(LinkedBlockingQueue<ArrayList<String>> q) throws InterruptedException, IOException {
// BufferedWriter out = new BufferedWriter(new FileWriter("C:\\temp\\output.txt"));
System.out.println("Executing Consumer......");
log("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<Starting File Execution>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
ArrayList<String> p = null;
String record = null;
boolean busy = true;
ThreadPoolExecutor tpe = new ThreadPoolExecutor(50, 100, 10000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFact());
Thread monitor = new Thread(new MonitorThread(tpe));
monitor.setDaemon(true);
monitor.start();
try {

while (busy) {
p = q.poll();
if (p == null) {
Thread.yield();
continue;
}
Iterator ir = p.iterator();
while (ir.hasNext()) {
record = (String) ir.next();
ir.remove();
if (record == null) {
busy = false;
break;
} else {
//getting the data from the queue and giving it to Consumer thread pool
tpe.execute(new ConsumerThread("Data" + record));
total++;
}
}
Thread.sleep(2000);
p = null;
}
} catch (RejectedExecutionException e) {
// This exception comes up when the queue limit is reached.
//No new tasks are put in the queue.
System.out.println("SORRY !!! Unable to execute task");
//e.printStackTrace();
}
System.out.println("Total records processed>>>>>>>>>>" + total);
log("<<<<<<<<<<<<<<<<<<<<<<<<<<<Process Completed>>>>>>>>>>>>>>>>>>>>>>>>>>>");
Thread.sleep(3000);
tpe.shutdown();
}

//This is the thread factory that can be used to create consumer threads.
class ThreadFact implements ThreadFactory {

public Thread newThread(Runnable arg0) {
// TODO Auto-generated method stub
return new Thread(arg0);
}
}

//This is the actual thread that does the execution
class ConsumerThread implements Runnable {

private String data;

public ConsumerThread(String data) {
this.data = data;

}

public synchronized void run() {
System.out.println("Starting Thread with data:" + data);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

//This gives the statistics of the ThreadPoolExecutor so that
//we can see the creation of new threads and closing of idle threads.
class MonitorThread implements Runnable {

private ThreadPoolExecutor tpe;

public MonitorThread(ThreadPoolExecutor tpe) {
this.tpe = tpe;
}

public void run() {
// TODO Auto-generated method stub
while (true) {
try {
System.out.println("Stats---->Active Consumer Thread Count:" + tpe.getActiveCount() + " Total Count:" + tpe.getPoolSize() + " Consumer Queue Size:" + tpe.getQueue().size());
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
// end of class Reader
}


Producer.java


import java.io.*;
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

class Producer implements Runnable {

private LinkedBlockingQueue<ArrayList<String>> q=null;
private String input_f = null;
private BufferedReader br = null;
private int total = 0;

public Producer(LinkedBlockingQueue<ArrayList<String>> _q, String file_name) throws IOException {
q = _q;
input_f = file_name;
br = new BufferedReader(new FileReader(input_f));
}

public int getAnswer() {
return total;
}

public synchronized void run() {
System.out.println("Executing Producer");
ArrayList<String> tb = new ArrayList<String>();
String s = null;
try {
while (true) {
for (int z = 0; z < 100000; z++) {
s = br.readLine();
if (s == null) {
break;
}
total++;
tb.add(s.trim());
}
if (s == null) {
break;
}
try {
q.put(tb);
} catch (InterruptedException ex) {
Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
}
tb = new ArrayList<String>();
Thread.yield();
}
tb.add(s);
try {
//q.offer(tb);
q.put(tb);
} catch (InterruptedException ex) {
Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
}
br.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
Wouter Oet
Saloon Keeper

Joined: Oct 25, 2008
Posts: 2700

Use OneThreadPerQuestion. Continue here.


"Any fool can write code that a computer can understand. Good programmers write code that humans can understand." --- Martin Fowler
Please correct my English.
 
I agree. Here's the link: http://aspose.com/file-tools
 
subject: Single Producer and multiple consumer using Mulithreading
 
Similar Threads
Process the multiple records in a file by Producer/consumer concept using Multithreading
Producer/Consumer - how to notify consumer only?
Problem with Thread Interaction
Collection Class on a Network