@Override public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { if ( access.addAndGet(1) == 1 ) txStart = System.currentTimeMillis(); long bytes = XByteBuffer.getDataPackageLength(((ChannelData)msg).getDataPackageLength()); try { super.sendMessage(destination, msg, payload); }catch ( ChannelException x ) { msgTxErr.addAndGet(1); if ( access.get() == 1 ) access.addAndGet(-1); throw x; } mbTx += (bytes*destination.length)/(1024d*1024d); mbAppTx += bytes/(1024d*1024d); if ( access.addAndGet(-1) == 0 ) { long stop = System.currentTimeMillis(); timeTx += (stop - txStart) / 1000d; if ((msgTxCnt.get() / interval) >= lastCnt) { lastCnt++; report(timeTx); } } msgTxCnt.addAndGet(1); }
@Override public void broadcast(ChannelMessage message) throws ChannelException { if (impl==null || (impl.startLevel & Channel.MBR_TX_SEQ)!=Channel.MBR_TX_SEQ ) throw new ChannelException("Multicast send is not started or enabled."); byte[] data = XByteBuffer.createDataPackage((ChannelData)message); if (data.length>McastServiceImpl.MAX_PACKET_SIZE) { throw new ChannelException("Packet length["+data.length+"] exceeds max packet size of "+McastServiceImpl.MAX_PACKET_SIZE+" bytes."); } DatagramPacket packet = new DatagramPacket(data,0,data.length); try { impl.send(false, packet); } catch (Exception x) { throw new ChannelException(x); } }
@Override public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { if (access.addAndGet(1) == 1) txStart = System.currentTimeMillis(); long bytes = XByteBuffer.getDataPackageLength(((ChannelData) msg).getDataPackageLength()); try { super.sendMessage(destination, msg, payload); } catch (ChannelException x) { msgTxErr.addAndGet(1); if (access.get() == 1) access.addAndGet(-1); throw x; } mbTx += (bytes * destination.length) / (1024d * 1024d); mbAppTx += bytes / (1024d * 1024d); if (access.addAndGet(-1) == 0) { long stop = System.currentTimeMillis(); timeTx += (stop - txStart) / 1000d; if ((msgTxCnt.get() / interval) >= lastCnt) { lastCnt++; report(timeTx); } } msgTxCnt.addAndGet(1); }
@Override public void broadcast(ChannelMessage message) throws ChannelException { if (impl == null || (impl.startLevel & Channel.MBR_TX_SEQ) != Channel.MBR_TX_SEQ) throw new ChannelException("Multicast send is not started or enabled."); byte[] data = XByteBuffer.createDataPackage((ChannelData) message); if (data.length > McastServiceImpl.MAX_PACKET_SIZE) { throw new ChannelException("Packet length[" + data.length + "] exceeds max packet size of " + McastServiceImpl.MAX_PACKET_SIZE + " bytes."); } DatagramPacket packet = new DatagramPacket(data, 0, data.length); try { impl.send(false, packet); } catch (Exception x) { throw new ChannelException(x); } }
@Override public void messageReceived(ChannelMessage msg) { if ( rxStart == 0 ) rxStart = System.currentTimeMillis(); long bytes = XByteBuffer.getDataPackageLength(((ChannelData)msg).getDataPackageLength()); mbRx += bytes/(1024d*1024d); msgRxCnt.addAndGet(1); if ( msgRxCnt.get() % interval == 0 ) report(timeTx); super.messageReceived(msg); }
public ChannelData createData(CoordinationMessage msg, MemberImpl local) { msg.write(); ChannelData data = new ChannelData(true); data.setAddress(local); data.setMessage(msg.getBuffer()); data.setOptions(Channel.SEND_OPTIONS_USE_ACK); data.setTimestamp(System.currentTimeMillis()); return data; }
protected void sendPingMessage(Member[] members) { if ( members == null || members.length == 0 ) return; ChannelData data = new ChannelData(true);//generates a unique Id data.setAddress(getLocalMember(false)); data.setTimestamp(System.currentTimeMillis()); data.setOptions(getOptionFlag()); data.setMessage(new XByteBuffer(TCP_PING_DATA, false)); try { super.sendMessage(members, data, null); }catch (ChannelException x) { log.warn("Unable to send TCP ping.",x); } }
protected void sendMemberMessage(Member[] members, byte[] message) throws ChannelException { if ( members == null || members.length == 0 ) return; ChannelData data = new ChannelData(true); data.setAddress(getLocalMember(false)); data.setTimestamp(System.currentTimeMillis()); data.setOptions(getOptionFlag()); data.setMessage(new XByteBuffer(message, false)); super.sendMessage(members, data, null); }
protected void execute(ObjectReader reader) throws Exception{ int pkgcnt = reader.count(); if ( pkgcnt > 0 ) { ChannelMessage[] msgs = reader.execute(); for ( int i=0; i<msgs.length; i++ ) { /** * Use send ack here if you want to ack the request to the remote * server before completing the request * This is considered an asynchronized request */ if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND); try { //process the message getCallback().messageDataReceived(msgs[i]); /** * Use send ack here if you want the request to complete on this * server before sending the ack to the remote server * This is considered a synchronized request */ if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND); }catch ( Exception x ) { if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.FAIL_ACK_COMMAND); log.error("Error thrown from messageDataReceived.",x); } if ( getUseBufferPool() ) { BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage()); msgs[i].setMessage(null); } } } }
public static void main(String[] args) throws Exception { Member mbr = new MemberImpl("localhost", 9999, 0); ChannelData data = new ChannelData(); data.setAddress(mbr); byte[] buf = new byte[8192 * 4]; data.setMessage(new XByteBuffer(buf, false)); buf = XByteBuffer.createDataPackage(data); len = buf.length; NioReceiver receiver = new NioReceiver(); receiver.setPort(9999); receiver.setHost("localhost"); MyList list = new MyList(); receiver.setMessageListener(list); receiver.start(); System.out.println("Listening on 9999"); while (true) { try { synchronized (mutex) { mutex.wait(5000); if ( start != 0 ) { System.out.println("Throughput " + df.format(mb / seconds) + " MB/seconds, messages "+count+" accepts "+accept+", total "+mb+" MB."); } } }catch (Throwable x) { x.printStackTrace(); } } }
public synchronized ChannelData getMessage(Member mbr) { String msg = "Thread-"+Thread.currentThread().getName()+" Message:"+inc(); ChannelData data = new ChannelData(true); data.setMessage(new XByteBuffer(msg.getBytes(),false)); data.setAddress(mbr); return data; }
@Override public void messageReceived(ChannelMessage msg) { if (rxStart == 0) rxStart = System.currentTimeMillis(); long bytes = XByteBuffer.getDataPackageLength(((ChannelData) msg).getDataPackageLength()); mbRx += bytes / (1024d * 1024d); msgRxCnt.addAndGet(1); if (msgRxCnt.get() % interval == 0) report(timeTx); super.messageReceived(msg); }
protected void sendPingMessage(Member[] members) { if (members == null || members.length == 0) return; ChannelData data = new ChannelData(true);// generates a unique Id data.setAddress(getLocalMember(false)); data.setTimestamp(System.currentTimeMillis()); data.setOptions(getOptionFlag()); data.setMessage(new XByteBuffer(TCP_PING_DATA, false)); try { super.sendMessage(members, data, null); } catch (ChannelException x) { log.warn("Unable to send TCP ping.", x); } }
protected void sendMemberMessage(Member[] members, byte[] message) throws ChannelException { if (members == null || members.length == 0) return; ChannelData data = new ChannelData(true); data.setAddress(getLocalMember(false)); data.setTimestamp(System.currentTimeMillis()); data.setOptions(getOptionFlag()); data.setMessage(new XByteBuffer(message, false)); super.sendMessage(members, data, null); }
protected void execute(ObjectReader reader) throws Exception { int pkgcnt = reader.count(); if (pkgcnt > 0) { ChannelMessage[] msgs = reader.execute(); for (int i = 0; i < msgs.length; i++) { /** * Use send ack here if you want to ack the request to the * remote server before completing the request This is * considered an asynchronous request */ if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND); try { // process the message getCallback().messageDataReceived(msgs[i]); /** * Use send ack here if you want the request to complete on * this server before sending the ack to the remote server * This is considered a synchronized request */ if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND); } catch (Exception x) { if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.FAIL_ACK_COMMAND); log.error("Error thrown from messageDataReceived.", x); } if (getUseBufferPool()) { BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage()); msgs[i].setMessage(null); } } } }
@Override public synchronized void sendMessage(Member[] destination, ChannelMessage msg) throws ChannelException { byte[] data = XByteBuffer.createDataPackage((ChannelData)msg); BioSender[] senders = setupForSend(destination); ChannelException cx = null; for ( int i=0; i<senders.length; i++ ) { try { senders[i].sendMessage(data,(msg.getOptions()&Channel.SEND_OPTIONS_USE_ACK)==Channel.SEND_OPTIONS_USE_ACK); } catch (Exception x) { if (cx == null) cx = new ChannelException(x); cx.addFaultyMember(destination[i],x); } } if (cx!=null ) throw cx; }