I'm having some difficulty with a project at work. Where the old program ran processes one at a time, I was charged with making it threadable.The downside is, it currently tries to run every thread at the same time. There are typically thousands, or tens of thousands, of threads to run, with each one taking several seconds to complete. (Yes, this is a batch process, in case you were wondering.) The threads are all nearly identical.
The details of what they do or why it is set up this way are not important; I have several questions, however. Looking at the Java API, I think ThreadGroups or ThreadPools may be the solution, but I'm not sure.
1. How I can put a limit on the number of threads that run at a single time- while making sure that every thread is run (that is, no threads may be discarded)?
2. The data to be processed is read from a table in the database; each record, once processed, needs to be removed from that table IF processing completed successfully. How can a thread tell the calling program that it has successfully completed execution?
3. How can I ensure that hung threads are terminated? Note, each thread launches a child process (again, don't bother asking why- it's too painful to explain). I want to make sure the thread, and its associated child process, no longer take up memory. As long as the record was not removed from the database (see question #2) this record will simply be processed the next time the program is run.
Chad, I recently did same kind of work. Even I too was enhancing a plain java program which also processes millions of records at times.
Anyways, Here are some of answers you are looking for.
1) Either declare certain number of thread count in your config file and read from it or you can hardcode a variable with certain number of threads in your program. But declaring in config file makes your life better. even if you want to change in production, that won't be a code change.
See some of my code snippet.
So that is how I handled it. See if this can help you
2) to asnwer your question 2, just join the threads you created and that will make sure that all the threads are done.
Once your control comes after this for loop means all of your threads are done processing.
3) Probably I am not sure how to come up with solution for this one but what do you mean by hung threads? if thread is processing, some thing goes wrong then it should throw some exception and make your decision there what to do with it.
again, for parent and child thread issue should be taken care by join() method.
May be some one else will shed some more light on this.
Joined: Mar 25, 2007
Thanks for your prompt reply, Ugender! I think I understand what you'r egetting at... it makes sense to me, but I think I'd make some modifications. I could write code that would manually track the number of threads running, only allow the main program's loop to continue when a thread was open, track the time of each running thread to terminate it if it went on too long, and make a database call when the thread was finished to remove that record from the list of data to process.
Still, it seems that there should be an easier way. I did some more research; it seems like I should be able to use a ThreadPoolExecutor to limit the number of threads, queuing extra requests until they can be run, setting a keep-alive time, and having the threads call a callback method in the main class at the end of their code.
There's a problem with this approach, however. I'm reading the Java 1.5 API for ThreadPoolExecutor (http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.html). Keep-alive times only work when a) the pool has more than corePoolSize threads and b) the threads have been idle too long. Why is that a problem? a) Using an unbounded queue will queue all threads above and beyond corePoolSize. b) Using a bounded queue will, I think, discard extra thread requests beyond the boundary. IF I understand all of this correctly (it's all new to me) this means that if the corePoolSize was, say, 5, then there would NEVER be more than 5 threads running... but idle threads would only be terminated if there were more than 5... meaning that, potentially, 5 threads could become idle one by one (a lot can happen in 20,000 threads) and the program would be stuck waiting forever.
But I'm likely misunderstanding this completely. It just seems like it should be easy to say "yo- I'm going to fling a hundred thousand items at you, but I only want you to process them five at a time. Shut threads down if they take too long to complete, and let me know when they're done, okay?" Any thoughts?
I may be lost on something, but why not change the point of view?
Instead of creating a thread to process a request, you could create a group of consumers for a queue of requests. Each consumer will check the queue, get a request, process it and check the queue again.
You can specify the number of consumers based on your needs.
[Chad]: Keep-alive times only work when a) the pool has more than corePoolSize threads and b) the threads have been idle too long. Why is that a problem?
Well, maybe it isn't - in which case you can set core pool size and max pool size equal to each other, and then the keep-alive time will be irrelevant. Or you could use the simpler Executors.newFixedThreadPool(). But the reason for having keep-alive times is this: you may want your pool to have the ability to grow somewhat in response to demand (when there are a lot of requests), but later when demand is reduced, you want the pool to shrink back down to a "normal" level. Perhaps because the machine you're running on has other processes running, and you don't want to tie up resources with a huge pool if it's not being used. The keep-alive time is all about how quickly the pool should shrink back down to core size if it has extra unused threads. Maybe you want to dismiss excess threads immediately, or maybe you want them around a little longer to react in case additional requests show up.
[Chad]: a) Using an unbounded queue will queue all threads above and beyond corePoolSize. b) Using a bounded queue will, I think, discard extra thread requests beyond the boundary.
No - if the bounded queue is full, then new requests will block until space on the queue becomes available. Note that the BlockingQueue interface has both offer() and put() methods. It's the latter which is used by ThreadPoolExecutor. Note also that the submit() methods in ExecutorService don't have any documented way of returning without having submitted the task. If the queue is full, they can't return null - they have to block.
[Chad]: IF I understand all of this correctly (it's all new to me) this means that if the corePoolSize was, say, 5, then there would NEVER be more than 5 threads running...
No, you're confusing corePoolSize with maximumPoolSize. If core size is 5 but max is 10, then the pool will start at size 0 and then grow freely up to 5. It can grow over 5, up to 10, but those extra threads can go away if they're not kept busy. Once the pool size gets up to 5, it will never go under 5 unless you invoke a shutdown method. But it can certainly go over.
[Chad]: But I'm likely misunderstanding this completely. It just seems like it should be easy to say "yo- I'm going to fling a hundred thousand items at you, but I only want you to process them five at a time. Shut threads down if they take too long to complete, and let me know when they're done, okay?" Any thoughts?
The first part is easy, but shutting things down if they take too long, that's a lot harder. If you can write the tasks to be interruptible (using Thread.interrupt()) that would help considerably. If not though, there's no way to get a guaranteed thread shutdown in Java. This can get to be a much more involved topic than thread pooling, so I'll skip it for now. [ December 11, 2007: Message edited by: Jim Yingst ]
Very well explained by Jim. I just wanted to add the following (for let me know when you are done part.)
There are basically the following ways of doing this:
You can override afterCompletion() method of the threadPoolExecutor. This is called after completion of each and every task. Here you can keep a count as to how many tasks were completed.
Submit call to the ThreadPoolExecutor return a Future. You can query the future for completion
java.util.concurrent.ExecutorCompletionService can be used to wrap an Executor. It provides methods to get completed tasks from the Executor. If you know the number of taks that you are going to submit, then you can just keep taking from the service and when the count becomes equal to the tasks submitted that means you are done.
You can use getCompletedTaskCount() method of ThreadPoolExecutor. The problem with this method is, it does not block(Its not meant to also ), so you have to do this in a loop, sleeping in between. Which is kindda yuck.
Thanks for all the suggestions! This is what I ended up trying:
I�m calling Executors.newFixedThreadPool to create a pool with the number of threads specified in a property file, and then instead of starting individual threads, I call the execute() method of the pool, passing the thread.
At the end of the thread�s run() method, it calls a method in the class that actually started the thread. This method runs code meant to wait upon successful completion of the thread. If there is an error, the thread will throw an exception, so if it reached the end of the run() method I�m assuming everything is ok. Since I have four separate programs needing to run these same threads for the same class, I have them all implement an interface with that callback method, although each one has different code in the body of the method.
I don�t know if that�s the best way, but it SEEMS to work. I hope. I need to do more testing.
I�m still running into that issue, however: stopping threads once they�ve exceeded a time limit. I�ve been reading more on threads, and I haven�t found anything that seems like it will work. I can�t have the thread continually check for interrupts; you see, its main function is to call third-party software (FOP, for those of you who have had the misfortune to deal with it), and that�s where it would hang if anywhere.
I thought I may just have a second thread class that creates the real thread, joins it with a time limit, and then once it wakes up, checks to see if the thread is alive and kills it if it is. The only problem with that is� apparently there�s no good way to kill the thread? The appropriate methods are deprecated, interrupt() is useless in this case (as I explained in the last paragraph), and I think I�m correct in stating that e ven if this �outer� thread terminates, the �inner� thread will continue merrily along to process, even if it is stuck in an infinite loop? Is there any way to make the thread stop?
Joined: Jan 30, 2000
Well, there is the Thread.stop() method. It's deprecated, for good reason, but it may be possible to use it if you're very careful. In particular you should only use it if you're certain the thread you're stopping could not possibly hold any locks or share any data with your remaining threads. Which is a bit unusual, as usually you have some sort of inter-thread communication going on which represents shared data. You may be able to get the threads to communicate via mechanisms outside the JVM - for example, a thread could write all its results to a temp file, and then as its last act (signifying successful completion of the thread) rename tthe file and/or move it to a different directory. Other threads could then know that the work was completed because the file appeared with the new name or directory. You'd probably need a thread to scan for old temp files that hadn't been touched in a while, left over from killed threads, and delete them and/or reassign their tasks to new threads. This can get complex, and I have no idea if it fits the sort of work you're doing - it's just an example of how threads could possibly communicate through an external resource outside the JVM (the filesystem in this case) and thereby be immune to the undesireable effects of Thread.stop(). Maybe.
Earlier you mentioned how each thread has an associated Process - is that what's running FOP? Is that Apache's Formatting Objects Processor, or something else? Regardless, you can kill a Process much more safely than a Thread. Just use Process.destroy(). The streams from the process should terminate (prematurely), and that's the extent of the interaction with the JVM. This may allow you to then finish up the Thread quickly. Of course whatever the process was doing, you may also have to clean up some external resource (files, a database, whatever). Hope that helps...