Win a copy of Design for the Mind this week in the Design forum!
  • Post Reply
  • Bookmark Topic Watch Topic
  • New Topic

Multithreading client socket throws Stream corrupted exception

 
sridevi gokul
Greenhorn
Posts: 2
  • Mark post as helpful
  • send pies
  • Quote
  • Report post to moderator
Hi,

I have the following client program.

package com.dsf.connectors;

//import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.ObjectOutputStream;
import java.io.OutputStreamWriter;
//import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;
import java.sql.Time;
import java.util.HashMap;
import java.util.StringTokenizer;

import com.dsf.exception.DcipherException;


public class ClientSocket
{
Socket client = null;
//PrintWriter pWriter = null;
//BufferedInputStream bInStream = null;
InputStreamReader inStream = null;
ObjectOutputStream objOutStream = null;
OutputStreamWriter outStream = null;

public void connectToServer(String ipAddress, String port) throws DcipherException
{
try
{
client = new Socket(ipAddress,Integer.parseInt(port));
}
catch (NumberFormatException e)
{e.printStackTrace();
throw new DcipherException(e.getMessage());
}
catch (UnknownHostException e)
{
e.printStackTrace();
throw new DcipherException(e.getMessage());
}
catch (IOException e)
{
e.printStackTrace();
throw new DcipherException(e.getMessage());
}
}

public void send(String input) throws DcipherException
{
try
{
outStream = new OutputStreamWriter(client.getOutputStream(),"ISO-8859-1");
//pWriter = new PrintWriter(client.getOutputStream());
String clientMessage=input+(char) 13;
outStream.write(clientMessage);
outStream.flush();
System.out.println("Sent to agent");
}
catch (IOException e)
{
throw new DcipherException(e.getMessage());
}
}

public void sendObject(HashMap input) throws DcipherException
{
try
{

ByteArrayOutputStream baos = new ByteArrayOutputStream();
objOutStream = new ObjectOutputStream(baos);
objOutStream.writeObject(input);
objOutStream.flush();
byte[] serObj = baos.toByteArray();
System.out.println("Size of the serialized object = "+serObj.length+" bytes.");

objOutStream = new ObjectOutputStream(client.getOutputStream());
objOutStream.writeObject(input);
objOutStream.flush();
System.out.println("Rule sent to agent");
}
catch (IOException e)
{
e.printStackTrace();
throw new DcipherException(e.getMessage());
}
}

public String receive() throws DcipherException
{
try
{
System.out.println("In client receive");
inStream = new InputStreamReader(client.getInputStream(),"ISO-8859-1");
//bInStream = new BufferedInputStream(inStream);
StringBuffer serverMessage=new StringBuffer();
int c;

while (( c=inStream.read()) != 13)
{
System.out.println("In client receive1");
serverMessage.append((char) c);
System.out.println("In client receive:" +serverMessage.toString());
}
return serverMessage.toString();
}
catch (IOException e)
{
throw new DcipherException(e.getMessage());
}
}

public void close() throws DcipherException
{
try
{
if (objOutStream != null)
objOutStream.close();
if (inStream != null)
inStream.close();
if (outStream != null)
outStream.close();
if (client != null)
client.close();
}
catch (IOException e)
{
e.printStackTrace();
}
}
}

The Server code is like this.


package com.cts.dsf.agent;

import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.ObjectInputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.URL;
import java.net.URLConnection;
import java.util.HashMap;
import java.util.StringTokenizer;

import com.cts.dsf.filehandle.FileHandler;
import com.cts.dsf.filehandle.VSAMFileHandler;

public class ServerAgent
{
private String lookupDir = null;
private String outputDir = null;
private ServerSocket server = null;
int numConnections = 0;

public ServerAgent()
{
initServer();
}

public void setLookupDir()
{
XMLOperations xml = new XMLOperations();
lookupDir = xml.getLookupDir();

}
public void setOutputDir()
{
XMLOperations xml = new XMLOperations();
outputDir = xml.getOutputDir();

}
public String getLookupDir()
{
return lookupDir;
}
public String getOutputDir()
{
return outputDir;
}
public void initServer()
{
try
{
server = new ServerSocket(4888);
setLookupDir();
setOutputDir();
}
catch (IOException e)
{
e.printStackTrace();
}catch (Exception e)
{
e.printStackTrace();
}
}
public void run()
{
while ( true ) {
try {
Socket clientSocket = server.accept();
numConnections ++;
Server2Connection oneconnection = new Server2Connection(clientSocket, this, numConnections);

new Thread(oneconnection).start();
}
catch (IOException e) {
System.out.println(e);
}
}
}

public static void main(String args[])
{
ServerAgent agent = new ServerAgent();
agent.run();
}
}

class Server2Connection implements Runnable {
BufferedReader is;
Socket cs;
int id;
//ServerAgent server;
//InputStreamReader sReader;
//InputStream inStream ;
FileHandler fileHandle = null;
VSAMFileHandler vsamHandle = null;
private String lookupDir = null;
private String outputDir = null;

public Server2Connection(Socket cs, ServerAgent server, int id) {
this.cs = cs;
//this.server = server;
this.id = id;
this.lookupDir = server.getLookupDir();
this.outputDir = server.getOutputDir();
System.out.println( "Connection " + id + " established with: " + cs );

}

public void run()
{
HashMap ruleMap = null;

try {
System.out.println("Connection from -> " +cs.getInetAddress());
InputStream inStream=cs.getInputStream();
InputStreamReader sReader=new InputStreamReader(inStream,"ISO-8859-1");
StringBuffer taskStr=new StringBuffer();
int id;
char c;
while ((id = sReader.read())!=13)
{
c = (char)id;
taskStr.append(c);
}
System.out.println("For identification ->" + taskStr);
StringTokenizer st = new StringTokenizer(taskStr.toString(),"#");
int len= st.countTokens();
String[] message = new String[len];
for(int i=0;st.hasMoreTokens();i++){

message[i] = (String) st.nextElement();
}
//String[] message = taskStr.toString().split("#");
String ruleName = null;
if(message.length > 1)
ruleName = message[1];
if(taskStr.toString().equalsIgnoreCase("Hello"))
{
sendStatus(cs);
}
else if(taskStr.toString().equalsIgnoreCase("SendFiles"))
{
sendFileList(cs);
}
else if(taskStr.toString().equalsIgnoreCase("ExecuteRule"))
{
ruleMap = getRuleMap(inStream);
System.out.println("ruleMap="+ruleMap);
sendStatus(cs);
String fileType = (String)ruleMap.get("fileType");
if(fileType.equalsIgnoreCase("FLAT"))
{
fileHandle = new FileHandler();
System.out.println("Inside this 1***");
fileHandle.execute(ruleMap);
System.out.println("Inside this 2***");
}
else if(fileType.equalsIgnoreCase("VSAM"))
{
System.out.println("VSAM");
vsamHandle = new VSAMFileHandler();
vsamHandle.execute(ruleMap);
}
}
else if(taskStr.toString().equalsIgnoreCase("SendOutput"))
{
System.out.println("in send output");
ruleMap = getRuleMap(inStream);
String fileType = (String)ruleMap.get("fileType");
String fileName = (String)ruleMap.get("fileName");
String fileInput =(String)ruleMap.get("fileInput");
String fileOutput =(String)ruleMap.get("fileOutput");
String rule = (String)ruleMap.get("ruleName");
System.out.println("File type:" +fileType);
if(fileType.equalsIgnoreCase("FLAT"))
{
String result = getFileResult(rule);
System.out.println("Output:" +result);
sendFileResult(cs,result);
}

}
if(sReader!=null)
sReader.close();
if(inStream!=null)
inStream.close();
}
catch(Exception e)
{
e.printStackTrace();
}
finally
{
try
{

if (cs != null)
cs.close();
} catch (IOException e1) {
e1.printStackTrace();
} catch (Exception e1) {
e1.printStackTrace();
}
}
}

public void sendAgentDetails()
{
XMLOperations xml = new XMLOperations();
String requestToWebServer = xml.getAgentDetails();
String serverUrl = xml.getServerUrl();
try
{
URL url = new URL(serverUrl);
URLConnection conn = url.openConnection();
conn.setDoOutput(true);
OutputStreamWriter wr = new OutputStreamWriter(conn.getOutputStream());

wr.write(requestToWebServer);
wr.flush();

int code =((HttpURLConnection)conn).getResponseCode();
if(code == 200)
{
System.out.println("Response -> HTTP_OK" );

BufferedReader rd = new BufferedReader(new InputStreamReader(conn.getInputStream()));
String line;
while((line = rd.readLine()) != null)
{
System.out.println("response "+line);
}
rd.close();
wr.close();
}
}
catch (MalformedURLException e)
{

e.printStackTrace();
}
catch (IOException e)
{
e.printStackTrace();
} catch (Exception e)
{
e.printStackTrace();
}
}


public String getFileList(String lookupDir)
{
File fl=new File(lookupDir);
String[] filelist=fl.list();
String listOfFile="";
for(int i=0;i<filelist.length;i++)
{
if(i<(filelist.length-1))
{
listOfFile+=filelist[i]+"#";
}
else
{
listOfFile+=filelist[i];
}
}
listOfFile+=(char) 13;
return listOfFile;
}

public String getFileResult(String ruleName)
{
StringBuffer result = new StringBuffer();
try
{
FileReader file = new FileReader(outputDir + ruleName + "_output.txt");
BufferedReader br = new BufferedReader(file);
int i=0;
while((br.readLine()!=null) && i<5)
{
result=result.append(br.readLine());
i=i+1;
}
System.out.println("Result:" + result);
return (new String(result)+13);
}
catch (FileNotFoundException e)
{
e.printStackTrace();
} catch (IOException e)
{
e.printStackTrace();
} catch (Exception e)
{
e.printStackTrace();
}
return (new String(result)+13);
}



public void sendStatus(Socket cs)
{
String status="OK"+(char) 13;
OutputStreamWriter sendstatus;
System.out.println("Inside sendStatus");
try
{
sendstatus = new OutputStreamWriter(cs.getOutputStream(),"ISO-8859-1");
sendstatus.write(status);
sendstatus.flush();
sendstatus.close();
} catch (IOException e)
{
e.printStackTrace();
}
catch (Exception e)
{
e.printStackTrace();
}
System.out.println("Exiting sendStatus");
}

public void sendFileList(Socket cs)
{
String fileList=getFileList(lookupDir);
OutputStreamWriter sendFiles;
try
{
sendFiles = new OutputStreamWriter(cs.getOutputStream(),"ISO-8859-1");
sendFiles.write(fileList);
sendFiles.flush();
sendFiles.close();
} catch (IOException e)
{
e.printStackTrace();
}
catch (Exception e)
{
e.printStackTrace();
}
}
public void sendFileResult(Socket cs,String result)
{
OutputStreamWriter sendFiles;
try
{
sendFiles = new OutputStreamWriter(cs.getOutputStream(),"ISO-8859-1");
sendFiles.write(result);
sendFiles.flush();
sendFiles.close();
} catch (IOException e)
{
e.printStackTrace();
} catch (Exception e)
{
e.printStackTrace();
}
}

public HashMap getRuleMap(InputStream inStream)
{
HashMap ruleMap = null;
ObjectInputStream obj = null;
try
{
obj= new ObjectInputStream(inStream);

ruleMap = (HashMap)obj.readObject();

} catch (IOException e)
{
e.printStackTrace();
} catch (ClassNotFoundException e)
{
e.printStackTrace();
}
catch (Exception e)
{
e.printStackTrace();
}
return ruleMap;
}

When there is only one client then it is working fine. But If i have another browser or multiple clients working on this then I get the Stream corrupted Exception. The behaviour is inconsistent.

java.io.StreamCorruptedException: invalid stream header
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:767)
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:283)
at com.cts.dsf.agent.Server2Connection.getRuleMap(ServerAgent.java:390)
at com.cts.dsf.agent.Server2Connection.run(ServerAgent.java:169)
at java.lang.Thread.run(Thread.java:568)
java.lang.NullPointerException
at com.cts.dsf.agent.Server2Connection.run(ServerAgent.java:172)
at java.lang.Thread.run(Thread.java:568)





}


Can anyone help me out to find what the problem is?

Thanks In advance.
 
Stan James
(instanceof Sidekick)
Ranch Hand
Posts: 8791
  • Mark post as helpful
  • send pies
  • Quote
  • Report post to moderator
Hi, welcome to the ranch!

In the future, use the "CODE" button below the post editor to insert tags that preserve your indenting. It's pretty hard to follow code that's all left justified like that.

That's a bit long to dig into. Do you have a good idea where things are going badly? It sounds like two threads might be sharing your reader. Any chance of that?
 
sridevi gokul
Greenhorn
Posts: 2
  • Mark post as helpful
  • send pies
  • Quote
  • Report post to moderator
Sorry for that. Will do that from the next posting.

The reader is initialized only after the thread is started.
In that case ideally two threads will have two different reader to work with I guess.

I find this streamcorrupted exception caused when the ObjectInputStream is read from the server end.
 
  • Post Reply
  • Bookmark Topic Watch Topic
  • New Topic