1 package org.rpi.songcast;
2
3 import java.io.IOException;
4 import java.net.DatagramPacket;
5 import java.net.DatagramSocket;
6 import java.net.InetAddress;
7 import java.net.InetSocketAddress;
8 import java.net.MulticastSocket;
9 import java.net.NetworkInterface;
10 import java.net.SocketAddress;
11 import java.net.UnknownHostException;
12 import java.util.Vector;
13
14 import org.apache.log4j.Logger;
15
16 class UDPSender implements Runnable {
17
18 private Logger log = Logger.getLogger(this.getClass());
19 private boolean run = true;
20 private MulticastSocket mSocket = null;
21
22 private int mcastPort = 0;
23 private InetAddress mcastAddr = null;
24
25 private String zoneID = "";
26 private Vector mWorkQueue = new Vector();
27
28 UDPSender(int port, InetAddress addr, String zoneID) {
29 mcastPort = port;
30 mcastAddr = addr;
31 this.zoneID = zoneID;
32
33 try {
34 mSocket = new MulticastSocket(port);
35 mSocket.setReuseAddress(true);
36 InetAddress inet = InetAddress.getByName("192.168.1.72");
37 NetworkInterface netIf = NetworkInterface.getByInetAddress(inet);
38 mSocket.setNetworkInterface(netIf);
39 mSocket.setSoTimeout(5000);
40 NetworkInterface ifs = mSocket.getNetworkInterface();
41 log.debug("Receiver NetworkInterface: " + ifs.getDisplayName());
42
43
44
45 } catch (IOException ioe) {
46 log.error("problems creating the datagram socket.", ioe);
47 }
48
49
50
51
52
53
54
55 }
56
57 public synchronized boolean isEmpty() {
58 return mWorkQueue.isEmpty();
59 }
60
61 public synchronized void put(Object object) {
62 log.debug("Put Object in WorkQueue " + object.toString());
63 try {
64 mWorkQueue.addElement(object);
65 } catch (Exception e) {
66 log.error(e.getMessage(), e);
67 }
68 }
69
70
71
72
73 public synchronized Object get() {
74 Object object = peek();
75 if (object != null)
76 mWorkQueue.removeElementAt(0);
77 return object;
78 }
79
80
81
82
83 public Object peek() {
84 if (isEmpty())
85 return null;
86 return mWorkQueue.elementAt(0);
87 }
88
89 private void sleep(int value) {
90 try {
91 Thread.sleep(value);
92 } catch (InterruptedException e) {
93 log.error(e.getMessage(), e);
94 }
95 }
96
97 public synchronized void clear() {
98 try {
99 log.info("Clearing Work Queue. Number of Items: " + mWorkQueue.size());
100 mWorkQueue.clear();
101 log.info("WorkQueue Cleared");
102 } catch (Exception e) {
103 log.debug(e.getMessage(), e);
104 }
105 }
106
107 public void run() {
108 while (run) {
109 if (!isEmpty()) {
110 try {
111 byte[] command = (byte[]) get();
112 log.info("Pulled Command Subscription ");
113 processEvent(command);
114 log.info("Number of Commands in Queue: " + mWorkQueue.size());
115 } catch (Exception e) {
116 log.error(e.getMessage(), e);
117 }
118 } else {
119 sleep(100);
120 }
121 }
122 }
123
124 private void processEvent(byte[] bytes) {
125 try {
126 if (bytes == null)
127 return;
128 log.debug("Sending Command to : " + mcastAddr.getHostAddress() + ":" + mcastPort + " bytes: " + bytes);
129 DatagramPacket packet = new DatagramPacket(bytes, bytes.length, mcastAddr, mcastPort);
130 mSocket.send(packet);
131 StringBuilder sb = new StringBuilder();
132 for (byte b : bytes) {
133 sb.append(String.format("%02X ", b));
134 }
135 log.debug("sending multicast message: " + sb.toString());
136 } catch (Exception e) {
137 log.error("Error Sending Message: ", e);
138 }
139 }
140
141 public void disconnect()
142 {
143 if(mSocket !=null)
144 {
145 try {
146 mSocket.leaveGroup(mcastAddr);
147 } catch (IOException e) {
148 log.error("Error Leave Group");
149 }
150 try
151 {
152 mSocket.close();
153 }
154 catch(Exception e)
155 {
156 log.error("Error Close");
157 }
158 }
159 }
160
161 }