aspose file tools*
The moose likes Threads and Synchronization and the fly likes synchronization in a loop Big Moose Saloon
  Search | Java FAQ | Recent Topics | Flagged Topics | Hot Topics | Zero Replies
Register / Login
JavaRanch » Java Forums » Java » Threads and Synchronization
Bookmark "synchronization in a loop" Watch "synchronization in a loop" New topic
Author

synchronization in a loop

Scafuro Te
Greenhorn

Joined: Oct 08, 2007
Posts: 18
Hi all, i have a question in the synchronization context...
Is it possibile to provide a synchronization(object) inside a loop?
This is the case:

LinkedList Abbonati = new LinkedList();
public int SPublish(Event e){
synchronized(Abbonati) {
if(Abbonati.size()==0) {
return Abbonati.size();
}
for(int id=0;id<Abbonati.size();id++) {
synchronized(e) {
this.Abbonati.get(id).dispatch(e);
}
}
}
}

or it's wrong?thanks!
Nitesh Kant
Bartender

Joined: Feb 25, 2007
Posts: 1638

Well it is absolutely legal to synchronize inside a loop or for that matter anywhere in your method.
A word of caution though:

When you use nested locks be sure that you take these locks in the same order everywhere in your application. Otherwise, it may cause deadlocks.

Also, it may be better to put this synchronization logic inside the dispatch method, rather than the one calling it. This makes sure that no one calls dispatch without synchronization.


apigee, a better way to API!
Scafuro Te
Greenhorn

Joined: Oct 08, 2007
Posts: 18
Well..
In this application, I have 2 functions whose role is to publish an event in (respectively) a synchronous and an asynchronous way.
In both case, I think to get the lock first of the Event I want to publish to the subscriber, and secondly the lock of subscriber's list who want to receive the event, calling the dispatch(event) method.

Could you take a look to this code, hoping the beste locks work well?

TKX!!
/*The asyncronous publish, abbonati means to the LinkedList of Subscriber, every Subscriber must dispatch the event in the same order as it publish the event */

public int Publish(Event e){
synchronized(e) {
synchronized(Abbonati){
for(int id=0;id<this.Abbonati.size();id++) {
this.Abbonati.get(id).dispatch(e);
}
return this.Abbonati.size();
}
}
}
//synchronous publish
public int SPublish(Event e){
synchronized(Abbonati) {
if(Abbonati.size()==0) {
return Abbonati.size();
}
}
synchronized(e) {
synchronized(Abbonati) {
for(int id=0;id<Abbonati.size();id++) {
new Servitore(e,Abbonati.get(id)).start();
}
try {
e.wait();
}
catch (InterruptedException exception) {}
return this.Abbonati.size();
}
}
}
Nitesh Kant
Bartender

Joined: Feb 25, 2007
Posts: 1638

Scafuro:Could you take a look to this code, hoping the beste locks work well?

Whether the locks work well or not depends on the expected behavior.

If the expected behavior is that the same event should not be processed concurrently by more than one publisher AND different events must not get dispatched to the same list of subscribers concurrently, then the locks you have used make sense.

I could see one problem, in the SPublish() method, you take a lock on event and Abbonati and then you wait on event. This will not relinquish the lock on Abbonati. So, if some other thread is waiting to get the lock on Abbonati, it will not succeed till SPublish() finishes execution. So, be sure that this is the behavior you want.

DISCLAIMER: I have commented based on the little I could understand about your system from your description.
Scafuro Te
Greenhorn

Joined: Oct 08, 2007
Posts: 18
That's right...

The behaviour of the SPublish function is to
1) recnognise if there's an empty list of "abbonati" and, in this case, to return the abbonati.size().
2) if there are many subscriber registered in the abbonati linkedlist, the function must provide the "dispatch service" for each subscriber registered into this service channel.
This function, SPublish, must return an integer value, i mean the abbonati.size() linkedlist of subscriber in the system, only after at least a subscriber accomplished his dispatch task.
I have the problem to understand in which way could i implement this last beahviour of "synchronization" between the publish method, whose role is to publish an event to each subscriber, and the dispatch method, whose task is to dispatch the event "E" to the specific subscriber who call the function.

In this context, i need to guarantee that the order of dispatch(event) of each event is the same to the order of the publish(event) of the event...
tkx a lot!
Nitesh Kant
Bartender

Joined: Feb 25, 2007
Posts: 1638

Scafuro:
In this context, i need to guarantee that the order of dispatch(event) of each event is the same to the order of the publish(event) of the event...
tkx a lot!


As far as i understand the following is your problem,
1) There is a program/class that publish events.
2) The published event is supposed to be dispatched to the Subscribers.
3) The order of events must be maintained i.e. if E1,E2,E3 are published in this order, then all the subscribers must be dispatched these events in the same order i.e. E1,E2,E3.

If the above is the problem, then i dont think you can achieve the ordering using synchronization.
What you can do is, add all the published events to a queue(FIFO). You can have one thread take up the events from the queue and dispatch it to the subscribers.
It is important that you use only one thread to pick events from the queue as that will guarantee the order.

BTW, what you are trying to achieve is a common messaging paradigm. JMS calls this as a topic. Multiple people can publish events to a topic and multiple subscribers can be listening for events on the topic.
Scafuro Te
Greenhorn

Joined: Oct 08, 2007
Posts: 18
Ok ,I understood...
But my SPublish(event) function must provide to every subscriber registered into the system the dispatch of a single event.
I mean that this function SPublish(event) is called everytime by another function whose role is to take the lock on a FIFO of event in the system: everytime that the system call for each event the SPublish(selectedevent) function, the SPublish must dispatch this event to every subscriber.
But as you said, i must provide the behaviour that the dispatch of each order was the same of this of publish for every subscriber.

So, I mean that the SPublish function provide for the specific Event (the argument of the function) the capability to dispatch to every subscriber this selected Event, starting a producer thread whose role is to get start the dispatch function in the subscriber code. This last function get the lock of the Event in the argument of dispatch(event) function and when it ends the dispatch get a event.notify who, I mean, wake up the execution of the SPublish(event), which is waiting for at least the accomplishment of a subscriber dispatchment...
Here is the last code after your opinion:

CLASS SYSTEM
public int SPublish(Event e){
int dimensione=0;
synchronized(Abbonati) {
if(Abbonati.size()==0) {
return Abbonati.size();
}
}
synchronized(e) {
synchronized(Abbonati) {
for(int id=0;id<Abbonati.size();id++) {
new Servitore(e,Abbonati.get(id)).start();
}
dimensione=Abbonati.size();
e.notifyAll();
}
try {
e.wait();
}
catch (InterruptedException exception) {}
}
return dimensione;
}

CLASS SUBSCRIBERIMPL
import java.util.*;
public class SubscriberImpl implements Subscriber,Runnable {
private LinkedList<Event> evento;
public SubscriberImpl() {
evento = new LinkedList<Event>();
}
public void dispatch(Event e) {
evento.add(e);
new Thread(this).run();
}

public void run(){
while(evento.size()==0) {
System.out.println("In attesa di eventi");
try {
this.evento.wait();
}
catch (InterruptedException e){}
}
synchronized(this.evento.getFirst()){
try{
Thread.currentThread().sleep(3000);
System.out.println("Dispatch");
}
catch (Exception ee){}
this.evento.removeFirst();
this.evento.notify();
}
}
}

THE SERVER THREAD CALLED FROM THE SYSTEM:
import java.util.LinkedList;

public class Servitore extends Thread {
private Event E;
private Subscriber Subscriber;
public Servitore(Event E, Subscriber Subscriber) {
this.E=E;
this.Subscriber=Subscriber;
}
public void run() {
synchronized(E) {
try{
//attendo risveglio
Subscriber.dispatch(E);
}
catch (Exception error) {}
}
}
}
Nitesh Kant
Bartender

Joined: Feb 25, 2007
Posts: 1638

There is the following that I want to say:
  • The event queue must not be in the subscriber. It should be maintained with the class System.
  • It is not a good idea to use wait-notify between a dispatcher and the subscriber. You may provide a well defined method to acknowledge completion of event processing by the subscriber. This will be the case if the subscriber is asynchronosly processing the event.


  • P.S.: Please UseCodeTags while posting real code.
    Scafuro Te
    Greenhorn

    Joined: Oct 08, 2007
    Posts: 18
    Cool...
    Before you said it's better to use only a server-thread for each event.
    So i modify the code and now I launch a server-thread that takes 2 argument: the event I want to publish (this one of SPublish(event) method) and the list of Subscriber into the System.
    Now the server-thread "servitore" is called from the system and in its run method it call for each subscriber the dispatch method, that's right?here is the code:


    Now the dispatch method called for each subscriber DON'T HAVE anymore a event queue, as you just (righyly!) said.
    Here is the code:


    Ok, now the problem is how to get synchronized the accomplishment of at least a dispatch method (for a generic E event processed from dispatch function) and the SPublish(event E) in the sense that this method must wait for the accomplishment of at least a dispatching to end itself and return the subscriber's list size?
    You said not to use the wait & notify...
    In my mind I thought:
    SYSTEM SIDE
    1)let's start the thread-server for this event E
    2)
    SUBSCRIBER SIDE
    1)take the lock on the event assigned from the server-thread
    2)simple message for dispatching(execution)
    3)after closing the synchronized(event) area signal with event.notify() to let the SPublish's SYSTEM method exit from the "event.wait()" statment.

    If it doesn't works, as I think you said, could you propose a solution to this synchronisation problem?

    Really really thanks!
    Nitesh Kant
    Bartender

    Joined: Feb 25, 2007
    Posts: 1638

    Scafuro: So i modify the code and now I launch a server-thread that takes 2 argument: the event I want to publish (this one of SPublish(event) method) and the list of Subscriber into the System.

    As i suggested before, I think you should have a queue of events here for the Servitore thread. The thread should poll this queue for events. Whenever, it gets the event it should dispatch it to the list of subscribers.
    This is exactly what a ThreadPool provides you.
    Executors provide you a way to create a fixed size threadpool. You can use this for the threads polling a queue infrastructure.
    If you are prior to jdk 5, use the backport of the concurrent utils.
    Storing the event object inside the Servitore thread makes it bound to a single event i.e. you will have as many of these threads as the number of events. Thread creation is always an expensive process, so you must use a threadpool here.

    Scafuro: If it doesn't works, as I think you said, could you propose a solution to this synchronisation problem?

    I did not say it does not work. What I said was that it is not a good idea to expose the wait-notify mechanism to the subscribers. Your whole processing then depends on the subscriber. (What if a subscriber implementation forgets to send a notify?)
    Instead you can modify the Subscriber dispatch method as follows:



    In the above Callback is an interface that only has a done() method. The Servitore class can implement this interface to handle a completion of the event processing by the subscriber.
    You can implement it using wait-notify or anything else. The subscriber never knows about it.
    I dont think after this you will need any synchronize blocks in Subscriber.
    Also, you can remove synchronization on Abbonati in the Servitore thread as Abbonati does not change.

    BTW, It is better to implement Runnable than to extend Thread. Read this for more info on this.
    Scafuro Te
    Greenhorn

    Joined: Oct 08, 2007
    Posts: 18
    Sincerely I'm learning something more about concurency...Thank you Nitesh!
    I navigate through Internet searching something about threadpool...But I don't understand these very well...
    BTW the callback.done() method could be a good idea, but in my case i must works with a dispatch(event) method witch only takes an argument (this is an exercise on software architecure exam in software engineer faculty).

    I finally think that
    1) I start a Servitore-thread each times the Spublish or Publish method is called
    2) The generic Servitore.thread started, wait in a syncronized area the notification of a new adding of event in the list of the channel
    3) So the servitore-thread takes the first event in the list of event and process it, dispatching to each subscriber the event.
    4) the dispatch method called from the servitore-thread, takes the lock on the event that the subscriber sent to it as function argument, process the dispatchment then notify the accomplishment of the dispatching witch an event.notify(), this one that in a synchronized area the System is waiting to end his Spublish method

    Here is my (for now ) final code:

    SYSTEM


    SERVITORE-THREAD


    SUBSCRIBERIMPL
    Scafuro Te
    Greenhorn

    Joined: Oct 08, 2007
    Posts: 18
    Hi all,
    hi Nitesh!
    ?'d like to post the right version of the problem I ask: thisi s the solution to
    1) let exist one server-thread for each subscruber to a pay tv channel system
    2) let receive an event to every subscriber in a synchronous or asynchronous way
    3) see an event in the same order I received from the channel

    CHANNEL


    SERVER THREAD


    SUBSCRIBER


    THANK'S ALL!
     
    I agree. Here's the link: http://aspose.com/file-tools
     
    subject: synchronization in a loop
     
    Similar Threads
    Will this work correctly?
    about notify() and wait()
    ConcurrentModificationException
    Java: Thread
    Manager / worker / updated data