package org.frdcsa.unilang.agent; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.net.Socket; import java.net.SocketException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import com.thoughtworks.xstream.XStream; import org.frdcsa.unilang.util.Message; /** * UniLang agent interface. This class allows an agent to communicate * asynchronously with a UniLang server. */ public class Agent implements Runnable { private String name; private Socket sock; private InputStream input; private BufferedReader reader; private OutputStream output; private List listeners = new ArrayList(); private Thread thread; private AtomicBoolean shutdownFlag = new AtomicBoolean(false); private static XStream xstream = new XStream(); static { xstream.alias("message", Message.class); } /** * Create a new agent. The agent should first be * {@link #connect(String, int)}ed * to the UniLang server and then {@link #start()}ed to begin * dispatching messages. * * @param name The name of this agent. */ public Agent(String name) { this.name = name; if(name == null || name.equals("")) throw new RuntimeException("Agent name must be specified"); } /** * Start the message loop of this agent. A new thread is created and * message dispatching begins. */ public void start() { if(thread != null) throw new IllegalThreadStateException("Agent already running"); thread = new Thread(this, "UniLang-Agent-" + name); thread.setDaemon(true); thread.start(); } /** * Stop message dispatching of the agent. This does not interfere with * the UniLang connection so it's possible to {@link #start()} and stop * the agent while it remains connected. * * @throws InterruptedException If interrupted waiting for thread to stop. */ public void stop() throws InterruptedException { if(thread == null) throw new IllegalThreadStateException("Agent not running"); shutdownFlag.set(true); synchronized(this) { thread.interrupt(); wait(); thread = null; } } /** * Implementation of {@link Runnable} interface. */ @Override public void run() { String line; String message = ""; String end = "UNKNOWN"; try { while(true) { // spin slowly until ready to read try { if(!reader.ready()) Thread.sleep(1000); } catch(InterruptedException ex) { if(shutdownFlag.get()) break; // TODO else log interruption } if(!reader.ready()) continue; // read a line line = reader.readLine(); // socket closed under us if(line == null) break; // current protocol is a hack to know how long a message is message += line; if(message.equals(line)) { end = line.replace("<", "