public V put(K key, V value, boolean notify) { MapEntry<K,V> entry = new MapEntry<K,V>(key,value); entry.setBackup(false); entry.setProxy(false); entry.setCopy(false); entry.setPrimary(channel.getLocalMember(false)); V old = null; //make sure that any old values get removed if ( containsKey(key) ) old = remove(key); try { if ( notify ) { Member[] backup = publishEntryInfo(key, value); entry.setBackupNodes(backup); } } catch (ChannelException x) { log.error("Unable to replicate out data for a AbstractReplicatedMap.put operation", x); } innerMap.put(key,entry); return old; }
@Override public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { try { super.sendMessage(destination, msg, payload); } catch (ChannelException cx) { FaultyMember[] mbrs = cx.getFaultyMembers(); for (int i = 0; i < mbrs.length; i++) { if (mbrs[i].getCause() != null && (!(mbrs[i].getCause() instanceof RemoteProcessException))) {// RemoteProcessException's // are // ok this.memberDisappeared(mbrs[i].getMember()); } // end if } // for throw cx; } }
@Test public void testTcpMcastFail() throws Exception { System.out.println("testTcpMcastFail()"); clear(); channel1.start(Channel.DEFAULT); channel2.start(Channel.DEFAULT); //Thread.sleep(1000); assertEquals("Expecting member count to be equal",mbrlist1.members.size(),mbrlist2.members.size()); channel2.stop(Channel.MBR_TX_SEQ); ByteMessage msg = new ByteMessage(new byte[1024]); try { Thread.sleep(5000); assertEquals("Expecting member count to be equal",mbrlist1.members.size(),mbrlist2.members.size()); channel1.send(channel1.getMembers(), msg, 0); } catch ( ChannelException x ) { fail("Message send should have succeeded."); } channel1.stop(Channel.DEFAULT); channel2.stop(Channel.DEFAULT); }
@Override public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { if ( access.addAndGet(1) == 1 ) txStart = System.currentTimeMillis(); long bytes = XByteBuffer.getDataPackageLength(((ChannelData)msg).getDataPackageLength()); try { super.sendMessage(destination, msg, payload); }catch ( ChannelException x ) { msgTxErr.addAndGet(1); if ( access.get() == 1 ) access.addAndGet(-1); throw x; } mbTx += (bytes*destination.length)/(1024d*1024d); mbAppTx += bytes/(1024d*1024d); if ( access.addAndGet(-1) == 0 ) { long stop = System.currentTimeMillis(); timeTx += (stop - txStart) / 1000d; if ((msgTxCnt.get() / interval) >= lastCnt) { lastCnt++; report(timeTx); } } msgTxCnt.addAndGet(1); }
protected void sendElectionMsgToNextInline(MemberImpl local, CoordinationMessage msg) throws ChannelException { int next = Arrays.nextIndex(local,msg.getMembers()); int current = next; msg.leader = msg.getMembers()[0]; boolean sent = false; while ( !sent && current >= 0 ) { try { sendElectionMsg(local, msg.getMembers()[current], msg); sent = true; }catch ( ChannelException x ) { log.warn("Unable to send election message to:"+msg.getMembers()[current]); current = Arrays.nextIndex(msg.getMembers()[current],msg.getMembers()); if ( current == next ) throw x; } } }
protected void handleMyToken(MemberImpl local, CoordinationMessage msg, Member sender, Membership merged) throws ChannelException { if (local.equals(msg.getLeader())) { // no leadership change if (Arrays.sameMembers(msg.getMembers(), merged.getMembers())) { msg.type = COORD_CONF; super.sendMessage(Arrays.remove(msg.getMembers(), local), createData(msg, local), null); handleViewConf(msg, local, merged); } else { // membership change suggestedView = new Membership(local, AbsoluteOrder.comp, true); suggestedviewId = msg.getId(); Arrays.fill(suggestedView, merged.getMembers()); msg.view = merged.getMembers(); sendElectionMsgToNextInline(local, msg); } } else { // leadership change suggestedView = null; suggestedviewId = null; msg.view = merged.getMembers(); sendElectionMsgToNextInline(local, msg); } }
protected void handleMyToken(MemberImpl local, CoordinationMessage msg, Member sender,Membership merged) throws ChannelException { if ( local.equals(msg.getLeader()) ) { //no leadership change if ( Arrays.sameMembers(msg.getMembers(),merged.getMembers()) ) { msg.type = COORD_CONF; super.sendMessage(Arrays.remove(msg.getMembers(),local),createData(msg,local),null); handleViewConf(msg,local,merged); } else { //membership change suggestedView = new Membership(local,AbsoluteOrder.comp,true); suggestedviewId = msg.getId(); Arrays.fill(suggestedView,merged.getMembers()); msg.view = merged.getMembers(); sendElectionMsgToNextInline(local,msg); } } else { //leadership change suggestedView = null; suggestedviewId = null; msg.view = merged.getMembers(); sendElectionMsgToNextInline(local,msg); } }
protected void handleViewConf(CoordinationMessage msg, Member sender,Membership merged) throws ChannelException { if ( viewId != null && msg.getId().equals(viewId) ) return;//we already have this view view = new Membership((MemberImpl)getLocalMember(false),AbsoluteOrder.comp,true); Arrays.fill(view,msg.getMembers()); viewId = msg.getId(); if ( viewId.equals(suggestedviewId) ) { suggestedView = null; suggestedviewId = null; } if (suggestedView != null && AbsoluteOrder.comp.compare(suggestedView.getMembers()[0],merged.getMembers()[0])<0 ) { suggestedView = null; suggestedviewId = null; } viewChange(viewId,view.getMembers()); fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_CONF_RX,this,"Accepted View")); if ( suggestedviewId == null && hasHigherPriority(merged.getMembers(),membership.getMembers()) ) { startElection(false); } }
@Test public void testTcpSendFailureMemberDrop() throws Exception { System.out.println("testTcpSendFailureMemberDrop()"); clear(); channel1.start(Channel.DEFAULT); channel2.start(Channel.DEFAULT); //Thread.sleep(1000); assertEquals("Expecting member count to be equal",mbrlist1.members.size(),mbrlist2.members.size()); channel2.stop(Channel.SND_RX_SEQ); ByteMessage msg = new ByteMessage(new byte[1024]); try { channel1.send(channel1.getMembers(), msg, 0); fail("Message send should have failed."); } catch ( ChannelException x ) { // Ignore } assertEquals("Expecting member count to not be equal",mbrlist1.members.size()+1,mbrlist2.members.size()); channel1.stop(Channel.DEFAULT); channel2.stop(Channel.DEFAULT); }
@Override public void messageReceived(ChannelMessage msg) { if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_ALIVE,0,COORD_ALIVE.length) ) { //ignore message, its an alive message fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MSG_ARRIVE,this,"Alive Message")); } else if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_HEADER,0,COORD_HEADER.length) ) { try { CoordinationMessage cmsg = new CoordinationMessage(msg.getMessage()); Member[] cmbr = cmsg.getMembers(); fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MSG_ARRIVE,this,"Coord Msg Arrived("+Arrays.toNameString(cmbr)+")")); processCoordMessage(cmsg, msg.getAddress()); }catch ( ChannelException x ) { log.error("Error processing coordination message. Could be fatal.",x); } } else { super.messageReceived(msg); } }
@Override public void broadcast(ChannelMessage message) throws ChannelException { if (impl == null || (impl.startLevel & Channel.MBR_TX_SEQ) != Channel.MBR_TX_SEQ) throw new ChannelException("Multicast send is not started or enabled."); byte[] data = XByteBuffer.createDataPackage((ChannelData) message); if (data.length > McastServiceImpl.MAX_PACKET_SIZE) { throw new ChannelException("Packet length[" + data.length + "] exceeds max packet size of " + McastServiceImpl.MAX_PACKET_SIZE + " bytes."); } DatagramPacket packet = new DatagramPacket(data, 0, data.length); try { impl.send(false, packet); } catch (Exception x) { throw new ChannelException(x); } }
@Override public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { if (access.addAndGet(1) == 1) txStart = System.currentTimeMillis(); long bytes = XByteBuffer.getDataPackageLength(((ChannelData) msg).getDataPackageLength()); try { super.sendMessage(destination, msg, payload); } catch (ChannelException x) { msgTxErr.addAndGet(1); if (access.get() == 1) access.addAndGet(-1); throw x; } mbTx += (bytes * destination.length) / (1024d * 1024d); mbAppTx += bytes / (1024d * 1024d); if (access.addAndGet(-1) == 0) { long stop = System.currentTimeMillis(); timeTx += (stop - txStart) / 1000d; if ((msgTxCnt.get() / interval) >= lastCnt) { lastCnt++; report(timeTx); } } msgTxCnt.addAndGet(1); }
@Test public void testOptionConflict() throws Exception { boolean error = false; channel.setOptionCheck(true); ChannelInterceptor i = new TestInterceptor(); i.setOptionFlag(128); channel.addInterceptor(i); i = new TestInterceptor(); i.setOptionFlag(128); channel.addInterceptor(i); try { channel.start(Channel.DEFAULT); }catch ( ChannelException x ) { if ( x.getMessage().indexOf("option flag conflict") >= 0 ) error = true; } assertTrue(error); }
@Override public void start(int svc) throws ChannelException { // start the thread if (!run) { synchronized (this) { if (!run && ((svc & Channel.SND_TX_SEQ) == Channel.SND_TX_SEQ)) {// only // start // with // the // sender startQueue(); } // end if } // sync } // end if super.start(svc); }
@Override public void broadcast(ChannelMessage message) throws ChannelException { if (impl==null || (impl.startLevel & Channel.MBR_TX_SEQ)!=Channel.MBR_TX_SEQ ) throw new ChannelException("Multicast send is not started or enabled."); byte[] data = XByteBuffer.createDataPackage((ChannelData)message); if (data.length>McastServiceImpl.MAX_PACKET_SIZE) { throw new ChannelException("Packet length["+data.length+"] exceeds max packet size of "+McastServiceImpl.MAX_PACKET_SIZE+" bytes."); } DatagramPacket packet = new DatagramPacket(data,0,data.length); try { impl.send(false, packet); } catch (Exception x) { throw new ChannelException(x); } }
@Test public void testOptionNoConflict() throws Exception { boolean error = false; channel.setOptionCheck(true); ChannelInterceptor i = new TestInterceptor(); i.setOptionFlag(128); channel.addInterceptor(i); i = new TestInterceptor(); i.setOptionFlag(64); channel.addInterceptor(i); i = new TestInterceptor(); i.setOptionFlag(256); channel.addInterceptor(i); try { channel.start(Channel.DEFAULT); }catch ( ChannelException x ) { if ( x.getMessage().indexOf("option flag conflict") >= 0 ) error = true; } assertFalse(error); }
/** * Helper method to broadcast a message to all members in a channel * * @param msgtype * int * @param rpc * boolean * @throws ChannelException */ protected void broadcast(int msgtype, boolean rpc) throws ChannelException { Member[] members = channel.getMembers(); // No destination. if (members.length == 0) return; // send out a map membership message, only wait for the first reply MapMessage msg = new MapMessage(this.mapContextName, msgtype, false, null, null, null, channel.getLocalMember(false), null); if (rpc) { Response[] resp = rpcChannel.send(members, msg, RpcChannel.FIRST_REPLY, (channelSendOptions), rpcTimeout); if (resp.length > 0) { for (int i = 0; i < resp.length; i++) { mapMemberAdded(resp[i].getSource()); messageReceived(resp[i].getMessage(), resp[i].getSource()); } } else { log.warn("broadcast received 0 replies, probably a timeout."); } } else { channel.send(channel.getMembers(), msg, channelSendOptions); } }
/** * Helper method to broadcast a message to all members in a channel * @param msgtype int * @param rpc boolean * @throws ChannelException */ protected void broadcast(int msgtype, boolean rpc) throws ChannelException { Member[] members = channel.getMembers(); // No destination. if (members.length == 0 ) return; //send out a map membership message, only wait for the first reply MapMessage msg = new MapMessage(this.mapContextName, msgtype, false, null, null, null, channel.getLocalMember(false), null); if ( rpc) { Response[] resp = rpcChannel.send(members, msg, RpcChannel.FIRST_REPLY, (channelSendOptions), rpcTimeout); if (resp.length > 0) { for (int i = 0; i < resp.length; i++) { mapMemberAdded(resp[i].getSource()); messageReceived(resp[i].getMessage(), resp[i].getSource()); } } else { log.warn("broadcast received 0 replies, probably a timeout."); } } else { channel.send(channel.getMembers(),msg,channelSendOptions); } }
@Override public synchronized void start(int svc) throws ChannelException { super.start(svc); running = true; if (thread == null && useThread) { thread = new PingThread(); thread.setDaemon(true); String channelName = ""; if (getChannel() instanceof GroupChannel && ((GroupChannel) getChannel()).getName() != null) { channelName = "[" + ((GroupChannel) getChannel()).getName() + "]"; } thread.setName("TcpPingInterceptor.PingThread" + channelName + "-" + cnt.addAndGet(1)); thread.start(); } // acquire the interceptors to invoke on send ping events ChannelInterceptor next = getNext(); while (next != null) { if (next instanceof TcpFailureDetector) failureDetector = new WeakReference<TcpFailureDetector>((TcpFailureDetector) next); if (next instanceof StaticMembershipInterceptor) staticMembers = new WeakReference<StaticMembershipInterceptor>((StaticMembershipInterceptor) next); next = next.getNext(); } }
public V put(K key, V value, boolean notify) { MapEntry<K, V> entry = new MapEntry<K, V>(key, value); entry.setBackup(false); entry.setProxy(false); entry.setCopy(false); entry.setPrimary(channel.getLocalMember(false)); V old = null; // make sure that any old values get removed if (containsKey(key)) old = remove(key); try { if (notify) { Member[] backup = publishEntryInfo(key, value); entry.setBackupNodes(backup); } } catch (ChannelException x) { log.error("Unable to replicate out data for a AbstractReplicatedMap.put operation", x); } innerMap.put(key, entry); return old; }
@Override public void memberDisappeared(Member member) { try { membership.removeMember((MemberImpl) member); super.memberDisappeared(member); try { fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MBR_DEL, this, "Member remove(" + member.getName() + ")")); if (started && (isCoordinator() || isHighest())) startElection(true); // to do, if a member disappears, only // the coordinator can start } catch (ChannelException x) { log.error("Unable to start election when member was removed.", x); } } finally { } }
@Override public void stop(int svc) throws ChannelException { try { halt(); synchronized (electionMutex) { if (!started)return; started = false; fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_STOP, this, "Before stop")); super.stop(startsvc); this.view = null; this.viewId = null; this.suggestedView = null; this.suggestedviewId = null; this.membership.reset(); fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_STOP, this, "After stop")); } }finally { release(); } }
@Override public void messageReceived(ChannelMessage msg) { if (Arrays.contains(msg.getMessage().getBytesDirect(), 0, COORD_ALIVE, 0, COORD_ALIVE.length)) { // ignore message, its an alive message fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MSG_ARRIVE, this, "Alive Message")); } else if (Arrays.contains(msg.getMessage().getBytesDirect(), 0, COORD_HEADER, 0, COORD_HEADER.length)) { try { CoordinationMessage cmsg = new CoordinationMessage(msg.getMessage()); Member[] cmbr = cmsg.getMembers(); fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MSG_ARRIVE, this, "Coord Msg Arrived(" + Arrays.toNameString(cmbr) + ")")); processCoordMessage(cmsg, msg.getAddress()); } catch (ChannelException x) { log.error("Error processing coordination message. Could be fatal.", x); } } else { super.messageReceived(msg); } }
@Override public void memberDisappeared(Member member) { try { membership.removeMember((MemberImpl)member); super.memberDisappeared(member); try { fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MBR_DEL,this,"Member remove("+member.getName()+")")); if ( started && (isCoordinator() || isHighest()) ) startElection(true); //to do, if a member disappears, only the coordinator can start }catch ( ChannelException x ) { log.error("Unable to start election when member was removed.",x); } }finally { } }
@Override public void stop(int svc) throws ChannelException { // stop the thread if (run) { synchronized (this) { if (run && ((svc & Channel.SND_TX_SEQ) == Channel.SND_TX_SEQ)) { stopQueue(); } // end if } // sync } // end if super.stop(svc); }
public void mapMemberAdded(Member member) { if (member.equals(getChannel().getLocalMember(false))) return; boolean memberAdded = false; // select a backup node if we don't have one Member mapMember = getChannel().getMember(member); if (mapMember == null) { log.warn("Notified member is not registered in the membership:" + member); return; } synchronized (mapMembers) { if (!mapMembers.containsKey(mapMember)) { if (log.isInfoEnabled()) log.info("Map member added:" + mapMember); mapMembers.put(mapMember, Long.valueOf(System.currentTimeMillis())); memberAdded = true; } } if (memberAdded) { synchronized (stateMutex) { Iterator<Map.Entry<K, MapEntry<K, V>>> i = innerMap.entrySet().iterator(); while (i.hasNext()) { Map.Entry<K, MapEntry<K, V>> e = i.next(); MapEntry<K, V> entry = innerMap.get(e.getKey()); if (entry == null) continue; if (entry.isPrimary() && (entry.getBackupNodes() == null || entry.getBackupNodes().length == 0)) { try { Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); entry.setBackupNodes(backup); entry.setPrimary(channel.getLocalMember(false)); } catch (ChannelException x) { log.error("Unable to select backup node.", x); } // catch } // end if } // while } // synchronized } // end if }
public V remove(Object key, boolean notify) { MapEntry<K,V> entry = innerMap.remove(key); try { if (getMapMembers().length > 0 && notify) { MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_REMOVE, false, (Serializable) key, null, null, null,null); getChannel().send(getMapMembers(), msg, getChannelSendOptions()); } } catch ( ChannelException x ) { log.error("Unable to replicate out data for a AbstractReplicatedMap.remove operation",x); } return entry!=null?entry.getValue():null; }
@Override public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { if (!okToProcess(msg.getOptions())) { super.sendMessage(destination, msg, payload); return; } ChannelException cx = null; for (int i = 0; i < destination.length; i++) { try { int nr = 0; try { outLock.writeLock().lock(); nr = incCounter(destination[i]); } finally { outLock.writeLock().unlock(); } // reduce byte copy msg.getMessage().append(nr); try { getNext().sendMessage(new Member[] { destination[i] }, msg, payload); } finally { msg.getMessage().trim(4); } } catch (ChannelException x) { if (cx == null) cx = x; cx.addFaultyMember(x.getFaultyMembers()); } } // for if (cx != null) throw cx; }
/** * Send a message to one or more members in the cluster * @param destination Member[] - the destinations, null or zero length means all * @param msg ClusterMessage - the message to send * @param payload TBA */ @Override public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { if ( destination == null ) destination = membershipService.getMembers(); if ((msg.getOptions()&Channel.SEND_OPTIONS_MULTICAST) == Channel.SEND_OPTIONS_MULTICAST) { membershipService.broadcast(msg); } else { clusterSender.sendMessage(msg,destination); } if ( Logs.MESSAGES.isTraceEnabled() ) { Logs.MESSAGES.trace("ChannelCoordinator - Sent msg:" + new UniqueId(msg.getUniqueId()) + " at " +new java.sql.Timestamp(System.currentTimeMillis())+ " to "+Arrays.toNameString(destination)); } }
/** * Stops the channel * * @param svc * int * @throws ChannelException * @see org.apache.catalina.tribes.Channel#stop(int) */ @Override public synchronized void stop(int svc) throws ChannelException { if (hbthread != null) { hbthread.stopHeartbeat(); hbthread = null; } super.stop(svc); }
/** * Send a message and wait for the response. * @param destination Member[] - the destination for the message, and the members you request a reply from * @param message Serializable - the message you are sending out * @param rpcOptions int - FIRST_REPLY, MAJORITY_REPLY or ALL_REPLY * @param channelOptions channel sender options * @param timeout long - timeout in milliseconds, if no reply is received within this time null is returned * @return Response[] - an array of response objects. * @throws ChannelException */ public Response[] send(Member[] destination, Serializable message, int rpcOptions, int channelOptions, long timeout) throws ChannelException { if ( destination==null || destination.length == 0 ) return new Response[0]; //avoid dead lock int sendOptions = channelOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK; RpcCollectorKey key = new RpcCollectorKey(UUIDGenerator.randomUUID(false)); RpcCollector collector = new RpcCollector(key,rpcOptions,destination.length); try { synchronized (collector) { if ( rpcOptions != NO_REPLY ) responseMap.put(key, collector); RpcMessage rmsg = new RpcMessage(rpcId, key.id, message); channel.send(destination, rmsg, sendOptions); if ( rpcOptions != NO_REPLY ) collector.wait(timeout); } } catch ( InterruptedException ix ) { Thread.currentThread().interrupt(); } finally { responseMap.remove(key); } return collector.getResponses(); }
/** * Validates the option flags that each interceptor is using and reports * an error if two interceptor share the same flag. * @throws ChannelException */ protected void checkOptionFlags() throws ChannelException { StringBuilder conflicts = new StringBuilder(); ChannelInterceptor first = interceptors; while ( first != null ) { int flag = first.getOptionFlag(); if ( flag != 0 ) { ChannelInterceptor next = first.getNext(); while ( next != null ) { int nflag = next.getOptionFlag(); if (nflag!=0 && (((flag & nflag) == flag ) || ((flag & nflag) == nflag)) ) { conflicts.append("["); conflicts.append(first.getClass().getName()); conflicts.append(":"); conflicts.append(flag); conflicts.append(" == "); conflicts.append(next.getClass().getName()); conflicts.append(":"); conflicts.append(nflag); conflicts.append("] "); }//end if next = next.getNext(); }//while }//end if first = first.getNext(); }//while if ( conflicts.length() > 0 ) throw new ChannelException("Interceptor option flag conflict: "+conflicts.toString()); }
/** * Starts the channel * @param svc int - what service to start * @throws ChannelException * @see org.apache.catalina.tribes.Channel#start(int) */ @Override public synchronized void start(int svc) throws ChannelException { setupDefaultStack(); if (optionCheck) checkOptionFlags(); super.start(svc); if ( hbthread == null && heartbeat ) { hbthread = new HeartbeatThread(this,heartbeatSleeptime); hbthread.start(); } }
/** * Stops the channel * @param svc int * @throws ChannelException * @see org.apache.catalina.tribes.Channel#stop(int) */ @Override public synchronized void stop(int svc) throws ChannelException { if (hbthread != null) { hbthread.stopHeartbeat(); hbthread = null; } super.stop(svc); }
@Override public void start(int svc) throws ChannelException { //start the thread if (!run ) { synchronized (this) { if ( !run && ((svc & Channel.SND_TX_SEQ)==Channel.SND_TX_SEQ) ) {//only start with the sender startQueue(); }//end if }//sync }//end if super.start(svc); }