| Author |
non blocking socket
|
Rajeswaranr Ramachandran
Greenhorn
Joined: Aug 25, 2006
Posts: 1
|
|
|
how to create a non blocking socket in java ,similar to select system call
|
 |
sathish rontala
Greenhorn
Joined: Aug 28, 2006
Posts: 3
|
|
hi Check This code you will get it package com.atic.streaming; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.channels.spi.SelectorProvider; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.util.Iterator; import java.util.Set; import org.apache.log4j.BasicConfigurator; import org.apache.log4j.Category; public class NonBlockingServer { int port = 8001; Selector selector = null; ServerSocketChannel selectableChannel = null; int keysAdded = 0; static Category log = Category.getInstance(NonBlockingServer.class.getName()); static String QUIT_SERVER = "quit"; static String SHUTDOWN = "shutdown"; public NonBlockingServer() { } public NonBlockingServer( int port ) { this.port = port; } public void initialize() throws IOException { this.selector = SelectorProvider.provider().openSelector(); this.selectableChannel = ServerSocketChannel.open(); this.selectableChannel.configureBlocking(false); InetAddress lh = InetAddress.getLocalHost(); InetSocketAddress isa = new InetSocketAddress(lh, this.port ); this.selectableChannel.socket().bind(isa); } public void finalize() throws IOException { this.selectableChannel.close(); this.selector.close(); } public void acceptConnections() throws IOException, InterruptedException { SelectionKey acceptKey = this.selectableChannel.register( this.selector, SelectionKey.OP_ACCEPT ); log.debug( "Acceptor loop..." ); while (( this.keysAdded = acceptKey.selector().select()) > 0 ) { log.debug( "Selector returned " + this.keysAdded + " ready for IO operations" ); Set readyKeys = this.selector.selectedKeys(); Iterator i = readyKeys.iterator(); while (i.hasNext()) { SelectionKey key = (SelectionKey)i.next(); i.remove(); if ( key.isAcceptable() ) { ServerSocketChannel nextReady = (ServerSocketChannel)key.channel(); log.debug( "Processing selection key read=" + key.isReadable() + " write=" + key.isWritable() + " accept=" + key.isAcceptable() ); SocketChannel channel = nextReady.accept(); channel.configureBlocking( false ); SelectionKey readKey = channel.register( this.selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE ); readKey.attach( new ChannelCallback( channel ) ); } else if ( key.isReadable() ) { SelectableChannel nextReady = (SelectableChannel) key.channel(); log.debug( "Processing selection key read=" + key.isReadable() + " write=" + key.isWritable() + " accept=" + key.isAcceptable() ); this.readMessage( (ChannelCallback) key.attachment() ); } else if ( key.isWritable() ) { ChannelCallback callback = (ChannelCallback) key.attachment(); String message = "What is your name? "; ByteBuffer buf = ByteBuffer.wrap( message.getBytes() ); int nbytes = callback.getChannel().write( buf ); } } } log.debug( "End acceptor loop..." ); } public void writeMessage( SocketChannel channel, String message ) throws IOException { ByteBuffer buf = ByteBuffer.wrap( message.getBytes() ); int nbytes = channel.write( buf ); log.debug( "Wrote " + nbytes + " to channel." ); } static final int BUFSIZE = 8; public String decode( ByteBuffer byteBuffer ) throws CharacterCodingException { Charset charset = Charset.forName( "us-ascii" ); CharsetDecoder decoder = charset.newDecoder(); CharBuffer charBuffer = decoder.decode( byteBuffer ); String result = charBuffer.toString(); return result; } public void readMessage( ChannelCallback callback ) throws IOException, InterruptedException { ByteBuffer byteBuffer = ByteBuffer.allocate( BUFSIZE ); int nbytes = callback.getChannel().read( byteBuffer ); byteBuffer.flip(); String result = this.decode( byteBuffer ); log.debug( result ); if ( result.indexOf( "quit" ) >= 0 ) callback.getChannel().close(); else if ( result.indexOf( "shutdown" ) >= 0 ) { callback.getChannel().close(); throw new InterruptedException(); } else { callback.append( result.toString() ); //If we are done with the line then we execute the callback. if ( result.indexOf( "\n" ) >= 0 ) callback.execute(); } } public class ChannelCallback { private SocketChannel channel; private StringBuffer buffer; public ChannelCallback( SocketChannel channel ) { this.channel = channel; this.buffer = new StringBuffer(); } public void execute() throws IOException { log.debug( this.buffer.toString() ); writeMessage( this.channel, this.buffer.toString() ); buffer = new StringBuffer(); } public SocketChannel getChannel() { return this.channel; } public void append( String values ) { buffer.append( values ); } } public static void main( String[] args ) { BasicConfigurator.configure(); NonBlockingServer nbServer = new NonBlockingServer(); try { nbServer.initialize(); } catch ( IOException e ) { e.printStackTrace(); System.exit( -1 ); } try { nbServer.acceptConnections(); } catch ( IOException e ) { e.printStackTrace(); log.error( e ); } catch ( InterruptedException e ) { log.info( "Exiting normally..." ); } } }
|
 |
 |
|
|
subject: non blocking socket
|
|
|