• Post Reply Bookmark Topic Watch Topic
  • New Topic
programming forums Java Mobile Certification Databases Caching Books Engineering Micro Controllers OS Languages Paradigms IDEs Build Tools Frameworks Application Servers Open Source This Site Careers Other Pie Elite all forums
this forum made possible by our volunteer staff, including ...
Marshals:
  • Campbell Ritchie
  • Jeanne Boyarsky
  • Ron McLeod
  • Paul Clapham
  • Liutauras Vilda
Sheriffs:
  • paul wheaton
  • Rob Spoor
  • Devaka Cooray
Saloon Keepers:
  • Stephan van Hulst
  • Tim Holloway
  • Carey Brown
  • Frits Walraven
  • Tim Moores
Bartenders:
  • Mikalai Zaikin

Process the multiple records in a file by Producer/consumer concept using Multithreading

 
Greenhorn
Posts: 13
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
Hi,

I am new to coderanch forum.I have 3 years of Java exp and i never worked on threads.

Here is the high level overivew of my code that I have created.

• Create Executor service for Producer to pick up the records using Linked blocking queue..
• Once the records have been taken up by the producer, it will form like multiple array list and each one will have 100000 records.
• Using ThreadPoolExecutor, multiple Consumer Threads is getting triggered and Process the array list of records sequentially.
• All records have been processed then the loop continues to take up the upcoming records.

Issues I'm facing

-If there are 300000 records the queue has taken 100000 records in each index(configurable),my consumer code will took only 100000 records and suddenly my TPE get shutting down.

There are two class files that I have developed.
1.Reader.java
2.Producer.java

Reader.java will take a file, producer.java puts up in the Linked blocking queue.
Once queue is getting filled,using Thread pool executor i will create mulitple consumers to taken up the records for processing.

Please find the attached code .It would be great if anyone could solve my crictical issue.Forgive me if any typo is there


Sincerely,
Hari.

Reader.Java
-----------------



Producer.Java
------------------





 
Bartender
Posts: 2700
IntelliJ IDE Opera
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
What is TPE? And please UseCodeTags. You can edit your post with the button.
 
Harikumar Venkatesan
Greenhorn
Posts: 13
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
TPE --> Thread Pool Executor.

Sincerely,
Hari.
 
Marshal
Posts: 28177
95
Eclipse IDE Firefox Browser MySQL Database
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
You say "get shutting down". Is that a normal termination? Or is there an exception thrown?

And how can you tell whether all of the records have been processed or not? (If the answer is in that code, I apologize, but there was really too much code for me to read.)
 
Harikumar Venkatesan
Greenhorn
Posts: 13
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
I have declared the variable total.but that line is not at all printing.No exception has been thrown but at middle of the processing my JVM is getting shutdown.

Please let me know in case of any further clarifications.
 
Paul Clapham
Marshal
Posts: 28177
95
Eclipse IDE Firefox Browser MySQL Database
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
Well, sorry. There aren't any comments in that program (except comments about the queuing) so I have no idea what gets run and when and how.
All I can recommend at this point is that you simplify the program to the point that a second person could actually understand it.
 
Bartender
Posts: 1210
25
Android Python PHP C++ Java Linux
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
Hi Harikumar,

The
- design of this app,
- the way it's creating (mutliple) thread pool executors,
- the seemingly unnecessary "Thread.yields()" (premature optimization? Besides, executor service was introduced to separate the concern of what is done from the concerns of when and how it's done...but this separation is lost if things like yield are used) ....

All these can be improved, but it's difficult to explain the how's, if the end goal and constraints (for example, why do lines need to be consumed sequentially?) are not clear.

So, before getting lost in implementation swamp, can you give an idea of this application's end goal... if you were asked by a non programmer "what does this application do?", what will be your answer?

Best Regards
Karthik
 
Harikumar Venkatesan
Greenhorn
Posts: 13
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
Hi Karthik,

Sorry for so much confused.

Main requirement is:

1.Get the list of files in the folder and each file has 1million of records.
2.Read each record in a particular file and send it to the Database.
3.Likewise process the entire records in a file and move on to Next file

For the above approach I have designed it in Producer/Consumer concept using Concurrency API in Java.

As I Posted before Reader.java-->act as to Read the files and call the producer.java then once all the records has been pushed(100000 records in each index of array) to Linked Blocking queue

Threadpoolexecutor will start the process of taken up the records and going on.Thats it from coding perspective.


Issues that currently facing is like,

- I have to wait for the producer to process the 1 million records and then consumer is starting the process.
- Abruptly my consumer is shutting down when more number of records left.

Need to fix:

-Once the producer will start reading the record right away i need to start the Thread pool executor and notify to producer as well
-If possible we can increase to multiple producers as well.
-Without concurrency API,I can able to do it by 2 mins 45 secs for the processing of 30k records,since concurrency is most advanced I opted this code to develop.

Sorry for so much explanation.

Waiting for your favorable reply.

Sincerely,
Hari.




 
Harikumar Venkatesan
Greenhorn
Posts: 13
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
Can anyone please suggest this thread issue?

 
Karthik Shiraly
Bartender
Posts: 1210
25
Android Python PHP C++ Java Linux
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
Hi Harikumar,

Threading is a tough area. No need to apologise, that too to me! I'm no code gestapo, and we're all learners here in one area or another.

My question arose because in the original code listing, the consumer just prints the record to console, which can't have been the end goal; hence I asked what's the actual end goal. You've now clarified that end goal is to transfer records from files to DB, which helps a lot.

What I still haven't understood is, why is it necessary to finish reading all 1 million records of a file before submitting them in one go, and only then moving on to another file? I don't know what these files are. Can you tell whether these 2 constraints - 1) read all records first 2) finish one file then move to another - are really necessary?

If those are not necessary, then my keep-it-simple code would be (in pseudocode):


Don't do all the q.poll() and Thread.yields().
Measure this implementation's performance - it's the baseline performance for concurrent approach. It also solves the problem of not waiting for all records to be read.

Many possible optimizations are immediately obvious in this:
1. Instead of sending 1 SQL query for each record, we can speed things up by combining multiple records in a single INSERT statement to DB, or by combining them as batch statement.

2. Another optimization is to use just one additional ThreadPoolExecutor - executor 2.
Each thread of executor1 reads n lines, and submits those n lines to executor2. Threads of executor2 then pick up these small batches of n lines and write them in batch to DB.


For most optimum processing time, each core should be always executing a thread which is not blocked at that moment (either due to disk I/O block while reading files or disk I/O block while writing to DB). This is very system specific and requires you to try out different values for thread pool size and for record batch size.

A similar question came up in another forum, and the people there swore by 2 frameworks for such concurrent processing:
- One is Spring Batch
- Other is Apache Hadoop, the sledgehammer of data processing.
I'm not familiar with either at this point. I'm mentioning them here, because just in case simple java thread executors don't give you the necessary performance and you have some servers to apply for this processing, then you can look into these alternate solutions.

Best Regards
Karthik

 
Harikumar Venkatesan
Greenhorn
Posts: 13
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
Thanks Karthik for your quick reply.

1.No need to wait for reading the entire file.Simultanously we can start processing those records.
2.Yes,Once first file have some records done which means inserted into DB and then I need to take-up the another file for the processing.

I have gone through your pseudo code and implemented the coding.

Its processing the records faster,but in thread pool session only Pool-1,Thread-1 is running.Because of single thread will take so many minutes to process the 1 million records.

Its not parallely creating the threads and process those records.

If needed I will paste my developed code from your pseudo code.

Sincerely,
Hari.
 
Harikumar Venkatesan
Greenhorn
Posts: 13
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
Code:





 
Karthik Shiraly
Bartender
Posts: 1210
25
Android Python PHP C++ Java Linux
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
Hi Harikumar,

I ran this code as is, on a directory in my system with all text files, and it created and processed across multiple threads as expected.
If you're seeing everything in only 1 thread, it means one or more of these. Please check:

1. That there's only 1 file under c:\temp.
2. That there are multiple files but all those - except one - are probably not text files. readline() will block forever on such files without printing even a single message. Once 7 such non text files are opened, they'll block all threads in the pool and so they'll also block all other files in the directory (including other text files). Put a trace message at the start of run() to confirm whether multiple threads are getting started.

Best Regards
Karthik
 
Harikumar Venkatesan
Greenhorn
Posts: 13
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
Thanks Karthik.
 
Greenhorn
Posts: 1
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
I am also doing similar type of application but i am taking only one csv file for that i fixed nos of records. If i gave below 500 records nos then it is executing, but if i gave nos of records as 1000 then it is showing this error after processing 4k to 5k records.

Exception in thread "Thread-6" java.lang.OutOfMemoryError: GC overhead limit exc
eeded

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler i
n thread "Thread-6"
Exception in thread "Thread-5" java.lang.OutOfMemoryError: GC overhead limit exc
eeded
at java.util.Arrays.copyOfRange(Arrays.java:3664)
at java.lang.String.<init>(String.java:201)
at com.csvreader.CsvReader.readRecord(Unknown Source)
at hello.readThreadPool.run(readThreadPool.java:65)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.
java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
.java:617)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "cluster-2-localhost:27017" Exception in thread "Thread-7" j
ava.lang.OutOfMemoryError: GC overhead limit exceeded
java.lang.OutOfMemoryError: GC overhead limit exceeded

 
Would you like to try a free sample? Today we are featuring tiny ads:
a bit of art, as a gift, the permaculture playing cards
https://gardener-gift.com
reply
    Bookmark Topic Watch Topic
  • New Topic