Hi,
I have a question about the Producer/Consumer problem.
Let's assume I have a file with lots of data.
Each line of the file is defined as a pair
<Type>;<Value>
The task is to read the file line by line using one/multiple
thread(s) (producer(s)) and consume the data by multiple threads (consumers).
The problem is that each line with a given type must be ordered with other lines with the same type.
Example:
A;1
A;2
B;10
C;100
B;11
A;3
...
And we need to ensure that A;1 is processed before A;2.
We can, however, process B;10 before these two lines (A;1 and A;2).
My first solution was to create one thread (a procuder). The producer was responsible for reading the file line by line and putting pair into a BlockingQueue in the order they occur in the file.
I also created two additional BlockingQueues that provided data to two consumer threads. To achieve the proper order I created a ConcurrentHashMap where I stored pairs <
String, Intetger> (which means that the data of type X should go to Y-th queue). To achieve this I created an AtomicInteger counter which determines the number of a queue into which the data is put.
For the example above, the scenario could be:
1) first thread read "A;1", counter is 0, the mapping is not available in the map, so I need to add it and increment the counter (the mappig is: A->0)
1.1) first thread add "A;1" into the first output queue
2) second thread read "A;2", counter is 1, but the mapping is available (A->0), so I don't need to decrement the counter
2.1) second thread add "A;2" into the first output queue
3) first thread read "B;10", counter is 1, the mapping is not available in the map, so I need to add it and decrement the counter (the mappig is: B->1)
3.1) first thread add "B;10" into the Second output queue
4) second thread read "C;100", counter is 0, the mapping is not available in the map, so I need to add it and increment the counter (the mappig is: C->0)
4.1) first thread add "C;100" into the first output queue
...
Every data of the type "A" and "C" will be processed by consumer1 and every data of the type "B" will be processed by the consumer2.
The solution behaves well in the scenario described above, but in real world the second thread can put the line "A;2" before the first thread put the line "A;1".
My second solution was to modify the first one. I created only one thread which was responsible for data split. The solution works, but I wonder if it is effective.
How can I implement such producer/consumer problem and preserve the order for each type?