/** * Cluster member is gone * * @see org.apache.catalina.tribes.MembershipListener#memberDisappeared(org.apache.catalina.tribes.Member) */ @Override public void memberDisappeared(Member member) { try { hasMembers = channel.hasMembers(); if (log.isInfoEnabled()) log.info("Received member disappeared:" + member); // Notify our interested LifecycleListeners fireLifecycleEvent(BEFORE_MEMBERUNREGISTER_EVENT, member); unregisterMember(member); // Notify our interested LifecycleListeners fireLifecycleEvent(AFTER_MEMBERUNREGISTER_EVENT, member); } catch (Exception x) { log.error("Unable remove cluster node from replication system.", x); } }
/** * New cluster member is registered * * @see org.apache.catalina.tribes.MembershipListener#memberAdded(org.apache.catalina.tribes.Member) */ @Override public void memberAdded(Member member) { try { hasMembers = channel.hasMembers(); if (log.isInfoEnabled()) log.info("Replication member added:" + member); // Notify our interested LifecycleListeners fireLifecycleEvent(BEFORE_MEMBERREGISTER_EVENT, member); registerMember(member); // Notify our interested LifecycleListeners fireLifecycleEvent(AFTER_MEMBERREGISTER_EVENT, member); } catch (Exception x) { log.error("Unable to connect to replication system.", x); } }
/** * send a cluster message to one member * * @param msg message to transfer * @param dest Receiver member * @see org.apache.catalina.ha.CatalinaCluster#send(org.apache.catalina.ha.ClusterMessage, * org.apache.catalina.tribes.Member) */ @Override public void send(ClusterMessage msg, Member dest) { try { msg.setAddress(getLocalMember()); int sendOptions = channelSendOptions; if (msg instanceof SessionMessage && ((SessionMessage)msg).getEventType() == SessionMessage.EVT_ALL_SESSION_DATA) { sendOptions = Channel.SEND_OPTIONS_SYNCHRONIZED_ACK|Channel.SEND_OPTIONS_USE_ACK; } if (dest != null) { if (!getLocalMember().equals(dest)) { channel.send(new Member[] {dest}, msg, sendOptions); } else log.error("Unable to send message to local member " + msg); } else { Member[] destmembers = channel.getMembers(); if (destmembers.length>0) channel.send(destmembers,msg, sendOptions); else if (log.isDebugEnabled()) log.debug("No members in cluster, ignoring message:"+msg); } } catch (Exception x) { log.error("Unable to send message through cluster sender.", x); } }
@Test public void testCoord1() throws Exception { int expectedCount = channels[0].getMembers().length; for (int i = 1; i < CHANNEL_COUNT; i++) { assertEquals("Message count expected to be equal.", expectedCount, channels[i].getMembers().length); } Member member = coordinators[0].getCoordinator(); int cnt = 0; while (member == null && (cnt++ < 100)) { try { Thread.sleep(100); member = coordinators[0].getCoordinator(); } catch (Exception x) { /* Ignore */ } } for (int i = 0; i < CHANNEL_COUNT; i++) { assertEquals(member, coordinators[i].getCoordinator()); } System.out.println("Coordinator[1] is:" + member); }
protected void handleViewConf(CoordinationMessage msg, Member sender,Membership merged) throws ChannelException { if ( viewId != null && msg.getId().equals(viewId) ) return;//we already have this view view = new Membership((MemberImpl)getLocalMember(false),AbsoluteOrder.comp,true); Arrays.fill(view,msg.getMembers()); viewId = msg.getId(); if ( viewId.equals(suggestedviewId) ) { suggestedView = null; suggestedviewId = null; } if (suggestedView != null && AbsoluteOrder.comp.compare(suggestedView.getMembers()[0],merged.getMembers()[0])<0 ) { suggestedView = null; suggestedviewId = null; } viewChange(viewId,view.getMembers()); fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_CONF_RX,this,"Accepted View")); if ( suggestedviewId == null && hasHigherPriority(merged.getMembers(),membership.getMembers()) ) { startElection(false); } }
/** * Helper method to broadcast a message to all members in a channel * * @param msgtype * int * @param rpc * boolean * @throws ChannelException */ protected void broadcast(int msgtype, boolean rpc) throws ChannelException { Member[] members = channel.getMembers(); // No destination. if (members.length == 0) return; // send out a map membership message, only wait for the first reply MapMessage msg = new MapMessage(this.mapContextName, msgtype, false, null, null, null, channel.getLocalMember(false), null); if (rpc) { Response[] resp = rpcChannel.send(members, msg, RpcChannel.FIRST_REPLY, (channelSendOptions), rpcTimeout); if (resp.length > 0) { for (int i = 0; i < resp.length; i++) { mapMemberAdded(resp[i].getSource()); messageReceived(resp[i].getMessage(), resp[i].getSource()); } } else { log.warn("broadcast received 0 replies, probably a timeout."); } } else { channel.send(channel.getMembers(), msg, channelSendOptions); } }
protected void handleViewConf(CoordinationMessage msg, Member sender, Membership merged) throws ChannelException { if (viewId != null && msg.getId().equals(viewId)) return;// we already have this view view = new Membership((MemberImpl) getLocalMember(false), AbsoluteOrder.comp, true); Arrays.fill(view, msg.getMembers()); viewId = msg.getId(); if (viewId.equals(suggestedviewId)) { suggestedView = null; suggestedviewId = null; } if (suggestedView != null && AbsoluteOrder.comp.compare(suggestedView.getMembers()[0], merged.getMembers()[0]) < 0) { suggestedView = null; suggestedviewId = null; } viewChange(viewId, view.getMembers()); fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_CONF_RX, this, "Accepted View")); if (suggestedviewId == null && hasHigherPriority(merged.getMembers(), membership.getMembers())) { startElection(false); } }
@Override public void memberAdded(Member member) { if ( membership == null ) setupMembership(); boolean notify = false; synchronized (membership) { if (removeSuspects.containsKey(member)) { //previously marked suspect, system below picked up the member again removeSuspects.remove(member); } else if (membership.getMember(member) == null){ //if we add it here, then add it upwards too //check to see if it is alive if (memberAlive(member)) { membership.memberAlive( (MemberImpl) member); notify = true; } else { addSuspects.put(member, Long.valueOf(System.currentTimeMillis())); } } } if ( notify ) super.memberAdded(member); }
@Override public void messageReceived(Serializable s, Member m) { Data d = (Data)s; if ( !Data.verify(d) ) { System.err.println("ERROR"); } else { if (d.error) { errCnt++; if ( (errCnt % 100) == 0) { printStats(System.err); } throw new IllegalArgumentException(); } noErrCnt++; if ( (noErrCnt % 100) == 0) { printStats(System.err); } } }
@Override public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { if ( access.addAndGet(1) == 1 ) txStart = System.currentTimeMillis(); long bytes = XByteBuffer.getDataPackageLength(((ChannelData)msg).getDataPackageLength()); try { super.sendMessage(destination, msg, payload); }catch ( ChannelException x ) { msgTxErr.addAndGet(1); if ( access.get() == 1 ) access.addAndGet(-1); throw x; } mbTx += (bytes*destination.length)/(1024d*1024d); mbAppTx += bytes/(1024d*1024d); if ( access.addAndGet(-1) == 0 ) { long stop = System.currentTimeMillis(); timeTx += (stop - txStart) / 1000d; if ((msgTxCnt.get() / interval) >= lastCnt) { lastCnt++; report(timeTx); } } msgTxCnt.addAndGet(1); }
/** * Return the local member */ @Override public Member getLocalMember(boolean alive) { if (alive && localMember != null && impl != null) localMember.setMemberAliveTime(System.currentTimeMillis() - impl.getServiceStartTime()); return localMember; }
@Override public int compare(Member m1, Member m2) { //longer alive time, means sort first long result = m2.getMemberAliveTime() - m1.getMemberAliveTime(); if (result < 0) return -1; else if (result == 0) return 0; else return 1; }
/** * handle receive sessions from other not ( restart ) * @param msg * @param sender * @throws ClassNotFoundException * @throws IOException */ protected void handleALL_SESSION_DATA(SessionMessage msg,Member sender) throws ClassNotFoundException, IOException { counterReceive_EVT_ALL_SESSION_DATA++; if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataBegin",getName())); byte[] data = msg.getSession(); deserializeSessions(data); if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataAfter",getName())); //stateTransferred = true; }
/** * handle receive session is access at other node ( primary session is now * false) * * @param msg * @param sender * @throws IOException */ protected void handleSESSION_ACCESSED(SessionMessage msg, Member sender) throws IOException { counterReceive_EVT_SESSION_ACCESSED++; DeltaSession session = (DeltaSession) findSession(msg.getSessionID()); if (session != null) { if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.accessed", getName(), msg.getSessionID())); session.access(); session.setPrimarySession(false); session.endAccess(); } }
@Override public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { int size = msg.getMessage().getLength(); boolean frag = (size > maxSize) && okToProcess(msg.getOptions()); if (frag) { frag(destination, msg, payload); } else { msg.getMessage().append(frag); super.sendMessage(destination, msg, payload); } }
public void sendMessages(long delay, long sleep) throws Exception { resetMessageCounters(); Member local = channels[0].getLocalMember(true); Member dest = channels[1].getLocalMember(true); int n = 3; log.info("Sending " + n + " messages from [" + local.getName() + "] to [" + dest.getName() + "] with delay of " + delay + " ms between them."); for (int i = 0; i < n; i++) { channels[0].send(new Member[] { dest }, new TestMsg(), 0); boolean last = (i == n - 1); if (!last && delay > 0) { Thread.sleep(delay); } } log.info("Messages sent. Waiting no more than " + (sleep / 1000) + " seconds for them to be received"); long startTime = System.currentTimeMillis(); int countReceived; while ((countReceived = getReceivedMessageCount()) != n) { long time = System.currentTimeMillis(); if ((time - startTime) > sleep) { fail("Only " + countReceived + " out of " + n + " messages have been received in " + (sleep / 1000) + " seconds"); break; } Thread.sleep(100); } }
protected void handleOtherToken(MemberImpl local, CoordinationMessage msg, Member sender,Membership merged) throws ChannelException { if ( local.equals(msg.getLeader()) ) { //I am the new leader //startElection(false); } else { msg.view = merged.getMembers(); sendElectionMsgToNextInline(local,msg); } }
public Member[] getMapMembersExcl(Member[] exclude) { synchronized (mapMembers) { @SuppressWarnings("unchecked") // mapMembers has the correct type HashMap<Member, Long> list = (HashMap<Member, Long>)mapMembers.clone(); for (int i=0; i<exclude.length;i++) list.remove(exclude[i]); return getMapMembers(list); } }
@Override public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { int size = msg.getMessage().getLength(); boolean frag = (size>maxSize) && okToProcess(msg.getOptions()); if ( frag ) { frag(destination, msg, payload); } else { msg.getMessage().append(frag); super.sendMessage(destination, msg, payload); } }
@Override public boolean accept(Serializable msg, Member sender) { boolean result = false; if (msg instanceof MapMessage) { if (log.isTraceEnabled()) log.trace("Map[" + mapname + "] accepting...." + msg); result = Arrays.equals(mapContextName, ((MapMessage) msg).getMapId()); if (log.isTraceEnabled()) log.trace("Msg[" + mapname + "] accepted[" + result + "]...." + msg); } return result; }
protected Counter getOutCounter(Member mbr) { Counter cnt = outcounter.get(mbr); if ( cnt == null ) { cnt = new Counter(); outcounter.put(mbr,cnt); } return cnt; }
@Test public void testDataSendASYNC() throws Exception { System.err.println("Starting ASYNC"); for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(1024),Channel.SEND_OPTIONS_ASYNCHRONOUS|Channel.SEND_OPTIONS_UDP); //sleep for 50 sec, let the other messages in long start = System.currentTimeMillis(); while ( (System.currentTimeMillis()-start)<5000 && msgCount!=listener1.count.get()) Thread.sleep(500); System.err.println("Finished ASYNC"); assertEquals("Checking success messages.",msgCount,listener1.count.get()); }
public void messageReceived(ClusterMessage message) { if (log.isDebugEnabled() && message != null) log.debug("Assuming clocks are synched: Replication for " + message.getUniqueId() + " took=" + (System.currentTimeMillis() - (message).getTimestamp()) + " ms."); //invoke all the listeners boolean accepted = false; if (message != null) { for (Iterator<ClusterListener> iter = clusterListeners.iterator(); iter.hasNext();) { ClusterListener listener = iter.next(); if (listener.accept(message)) { accepted = true; listener.messageReceived(message); } } if (!accepted && notifyLifecycleListenerOnFailure) { Member dest = message.getAddress(); // Notify our interested LifecycleListeners fireLifecycleEvent(RECEIVE_MESSAGE_FAILURE_EVENT, new SendMessageData(message, dest, null)); if (log.isDebugEnabled()) { log.debug("Message " + message.toString() + " from type " + message.getClass().getName() + " transfered but no listener registered"); } } } return; }
protected void handleToken(CoordinationMessage msg, Member sender,Membership merged) throws ChannelException { MemberImpl local = (MemberImpl)getLocalMember(false); if ( local.equals(msg.getSource()) ) { //my message msg.src=local handleMyToken(local, msg, sender,merged); } else { handleOtherToken(local, msg, sender,merged); } }
/** * Install a new web application, whose web application archive is at the * specified URL, into this container and all the other members of the * cluster with the specified context name. * <p> * If this application is successfully installed locally, a ContainerEvent * of type <code>INSTALL_EVENT</code> will be sent to all registered * listeners, with the newly created <code>Context</code> as an argument. * * @param contextName * The context name to which this application should be installed * (must be unique) * @param webapp * A WAR file or unpacked directory structure containing the web * application to be installed * * @exception IllegalArgumentException * if the specified context name is malformed * @exception IllegalStateException * if the specified context name is already deployed * @exception IOException * if an input/output error was encountered during * installation */ @Override public void install(String contextName, File webapp) throws IOException { Member[] members = getCluster().getMembers(); if (members.length == 0) return; Member localMember = getCluster().getLocalMember(); FileMessageFactory factory = FileMessageFactory.getInstance(webapp, false); FileMessage msg = new FileMessage(localMember, webapp.getName(), contextName); if(log.isDebugEnabled()) log.debug(sm.getString("farmWarDeployer.sendStart", contextName, webapp)); msg = factory.readMessage(msg); while (msg != null) { for (int i = 0; i < members.length; i++) { if (log.isDebugEnabled()) log.debug(sm.getString("farmWarDeployer.sendFragment", contextName, webapp, members[i])); getCluster().send(msg, members[i]); } msg = factory.readMessage(msg); } if(log.isDebugEnabled()) log.debug(sm.getString( "farmWarDeployer.sendEnd", contextName, webapp)); }
@Test public void testCoord2() throws Exception { Member member = coordinators[1].getCoordinator(); System.out.println("Coordinator[2a] is:" + member); int index = -1; for ( int i=0; i<CHANNEL_COUNT; i++ ) { if ( channels[i].getLocalMember(false).equals(member) ) { System.out.println("Shutting down:" + channels[i].getLocalMember(true).toString()); channels[i].stop(Channel.DEFAULT); index = i; } } int dead = index; Thread.sleep(1000); if (index == 0) { index = 1; } else { index = 0; } System.out.println("Member count:"+channels[index].getMembers().length); member = coordinators[index].getCoordinator(); for (int i = 1; i < CHANNEL_COUNT; i++) { if (i != dead) { assertEquals(member, coordinators[i].getCoordinator()); } } System.out.println("Coordinator[2b] is:" + member); }
@Override public String toString() { StringBuilder buf = new StringBuilder("CoordinationEvent[type="); buf.append(type).append("\n\tLocal:"); Member local = interceptor.getLocalMember(false); buf.append(local != null ? local.getName() : "").append("\n\tCoord:"); buf.append(coord != null ? coord.getName() : "").append("\n\tView:"); buf.append(Arrays.toNameString(view != null ? view.getMembers() : null)).append("\n\tSuggested View:"); buf.append(Arrays.toNameString(suggestedView != null ? suggestedView.getMembers() : null)) .append("\n\tMembers:"); buf.append(Arrays.toNameString(mbrs)).append("\n\tInfo:"); buf.append(info).append("]"); return buf.toString(); }
/** * handle receive session is expire at other node ( expire session also * here) * * @param msg * @param sender * @throws IOException */ protected void handleSESSION_EXPIRED(SessionMessage msg, Member sender) throws IOException { counterReceive_EVT_SESSION_EXPIRED++; DeltaSession session = (DeltaSession) findSession(msg.getSessionID()); if (session != null) { if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.expired", getName(), msg.getSessionID())); session.expire(notifySessionListenersOnReplication, false); } }
@Override public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { try { byte[] data = compress(msg.getMessage().getBytes()); msg.getMessage().trim(msg.getMessage().getLength()); msg.getMessage().append(data,0,data.length); getNext().sendMessage(destination, msg, payload); } catch ( IOException x ) { log.error("Unable to compress byte contents"); throw new ChannelException(x); } }
@Override public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { if ( !okToProcess(msg.getOptions()) ) { super.sendMessage(destination, msg, payload); return; } ChannelException cx = null; for (int i=0; i<destination.length; i++ ) { try { int nr = 0; try { outLock.writeLock().lock(); nr = incCounter(destination[i]); } finally { outLock.writeLock().unlock(); } //reduce byte copy msg.getMessage().append(nr); try { getNext().sendMessage(new Member[] {destination[i]}, msg, payload); } finally { msg.getMessage().trim(4); } }catch ( ChannelException x ) { if ( cx == null ) cx = x; cx.addFaultyMember(x.getFaultyMembers()); } }//for if ( cx != null ) throw cx; }
/** * handle receive sessions from other not ( restart ) * * @param msg * @param sender * @throws ClassNotFoundException * @throws IOException */ protected void handleALL_SESSION_DATA(SessionMessage msg, Member sender) throws ClassNotFoundException, IOException { counterReceive_EVT_ALL_SESSION_DATA++; if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataBegin", getName())); byte[] data = msg.getSession(); deserializeSessions(data); if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataAfter", getName())); // stateTransferred = true; }
public boolean inSet(Member m, Member[] set) { if ( set == null ) return false; boolean result = false; for (int i=0; i<set.length && (!result); i++ ) if ( m.equals(set[i]) ) result = true; return result; }
/** * handle receive change sessionID at other node * @param msg * @param sender * @throws IOException */ protected void handleCHANGE_SESSION_ID(SessionMessage msg,Member sender) throws IOException { counterReceive_EVT_CHANGE_SESSION_ID++; DeltaSession session = (DeltaSession) findSession(msg.getSessionID()); if (session != null) { String newSessionID = deserializeSessionId(msg.getSession()); session.setPrimarySession(false); session.setId(newSessionID, false); if (notifyContainerListenersOnReplication) { getContainer().fireContainerEvent(Context.CHANGE_SESSION_ID_EVENT, new String[] {msg.getSessionID(), newSessionID}); } } }
public MemberImpl getMember(Member mbr) { if (hasMembers()) { MemberImpl result = null; for (int i = 0; i < this.members.length && result == null; i++) { if (members[i].equals(mbr)) result = members[i]; } // for return result; } else { return null; } }
/** * memberAdded * * @param member Member * TODO Implement this org.apache.catalina.tribes.MembershipListener * method */ @Override public void memberAdded(Member member) { log.info("Member added:"+member); synchronized (mutex) { mutex.notifyAll(); } }
/** * Find the master of the session state * * @return master member of sessions */ protected Member findSessionMasterMember() { Member mbr = null; Member mbrs[] = cluster.getMembers(); if (mbrs.length != 0) mbr = mbrs[0]; if (mbr == null && log.isWarnEnabled()) log.warn(sm.getString("deltaManager.noMasterMember", getName(), "")); if (mbr != null && log.isDebugEnabled()) log.warn(sm.getString("deltaManager.foundMasterMember", getName(), mbr)); return mbr; }
@Override public void remove(Member member) { // no op for now, should not cancel out any keys // can create serious sync issues // all TCP connections are cleared out through keepalive // and if remote node disappears }
public void mapMemberAdded(Member member) { if (member.equals(getChannel().getLocalMember(false))) return; boolean memberAdded = false; // select a backup node if we don't have one Member mapMember = getChannel().getMember(member); if (mapMember == null) { log.warn("Notified member is not registered in the membership:" + member); return; } synchronized (mapMembers) { if (!mapMembers.containsKey(mapMember)) { if (log.isInfoEnabled()) log.info("Map member added:" + mapMember); mapMembers.put(mapMember, Long.valueOf(System.currentTimeMillis())); memberAdded = true; } } if (memberAdded) { synchronized (stateMutex) { Iterator<Map.Entry<K, MapEntry<K, V>>> i = innerMap.entrySet().iterator(); while (i.hasNext()) { Map.Entry<K, MapEntry<K, V>> e = i.next(); MapEntry<K, V> entry = innerMap.get(e.getKey()); if (entry == null) continue; if (entry.isPrimary() && (entry.getBackupNodes() == null || entry.getBackupNodes().length == 0)) { try { Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); entry.setBackupNodes(backup); entry.setPrimary(channel.getLocalMember(false)); } catch (ChannelException x) { log.error("Unable to select backup node.", x); } // catch } // end if } // while } // synchronized } // end if }