@Override public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { try { super.sendMessage(destination, msg, payload); } catch (ChannelException cx) { FaultyMember[] mbrs = cx.getFaultyMembers(); for (int i = 0; i < mbrs.length; i++) { if (mbrs[i].getCause() != null && (!(mbrs[i].getCause() instanceof RemoteProcessException))) {// RemoteProcessException's // are // ok this.memberDisappeared(mbrs[i].getMember()); } // end if } // for throw cx; } }
@Override public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { try { super.sendMessage(destination, msg, payload); }catch ( ChannelException cx ) { FaultyMember[] mbrs = cx.getFaultyMembers(); for ( int i=0; i<mbrs.length; i++ ) { if ( mbrs[i].getCause()!=null && (!(mbrs[i].getCause() instanceof RemoteProcessException)) ) {//RemoteProcessException's are ok this.memberDisappeared(mbrs[i].getMember()); }//end if }//for throw cx; } }
/** * publish info about a map pair (key/value) to other nodes in the cluster * @param key Object * @param value Object * @return Member - the backup node * @throws ChannelException */ @Override protected Member[] publishEntryInfo(Object key, Object value) throws ChannelException { if (! (key instanceof Serializable && value instanceof Serializable) ) return new Member[0]; //select a backup node Member[] backup = getMapMembers(); if (backup == null || backup.length == 0) return null; try { //publish the data out to all nodes MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_COPY, false, (Serializable) key, (Serializable) value, null,channel.getLocalMember(false), backup); getChannel().send(backup, msg, getChannelSendOptions()); } catch (ChannelException e) { FaultyMember[] faultyMembers = e.getFaultyMembers(); if (faultyMembers.length == 0) throw e; ArrayList<Member> faulty = new ArrayList<Member>(); for (FaultyMember faultyMember : faultyMembers) { if (!(faultyMember.getCause() instanceof RemoteProcessException)) { faulty.add(faultyMember.getMember()); } } Member[] realFaultyMembers = faulty.toArray(new Member[faulty.size()]); if (realFaultyMembers.length != 0) { backup = excludeFromSet(realFaultyMembers, backup); if (backup.length == 0) { throw e; } else { if (log.isWarnEnabled()) { log.warn("Unable to replicate backup key:" + key + ". Success nodes:" + Arrays.toString(backup) + ". Failed nodes:" + Arrays.toString(realFaultyMembers), e); } } } } return backup; }
/** * publish info about a map pair (key/value) to other nodes in the cluster * * @param key * Object * @param value * Object * @return Member - the backup node * @throws ChannelException */ @Override protected Member[] publishEntryInfo(Object key, Object value) throws ChannelException { if (!(key instanceof Serializable && value instanceof Serializable)) return new Member[0]; // select a backup node Member[] backup = getMapMembers(); if (backup == null || backup.length == 0) return null; try { // publish the data out to all nodes MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_COPY, false, (Serializable) key, (Serializable) value, null, channel.getLocalMember(false), backup); getChannel().send(backup, msg, getChannelSendOptions()); } catch (ChannelException e) { FaultyMember[] faultyMembers = e.getFaultyMembers(); if (faultyMembers.length == 0) throw e; ArrayList<Member> faulty = new ArrayList<Member>(); for (FaultyMember faultyMember : faultyMembers) { if (!(faultyMember.getCause() instanceof RemoteProcessException)) { faulty.add(faultyMember.getMember()); } } Member[] realFaultyMembers = faulty.toArray(new Member[faulty.size()]); if (realFaultyMembers.length != 0) { backup = excludeFromSet(realFaultyMembers, backup); if (backup.length == 0) { throw e; } else { if (log.isWarnEnabled()) { log.warn("Unable to replicate backup key:" + key + ". Success nodes:" + Arrays.toString(backup) + ". Failed nodes:" + Arrays.toString(realFaultyMembers), e); } } } } return backup; }
/** * Sends a ping out to all the members in the cluster, not just map members * that this map is alive. * @param timeout long * @throws ChannelException */ protected void ping(long timeout) throws ChannelException { //send out a map membership message, only wait for the first reply MapMessage msg = new MapMessage(this.mapContextName, MapMessage.MSG_INIT, false, null, null, null, channel.getLocalMember(false), null); if ( channel.getMembers().length > 0 ) { try { //send a ping, wait for all nodes to reply Response[] resp = rpcChannel.send(channel.getMembers(), msg, RpcChannel.ALL_REPLY, (channelSendOptions), (int) accessTimeout); for (int i = 0; i < resp.length; i++) { memberAlive(resp[i].getSource()); } } catch (ChannelException ce) { // Handle known failed members FaultyMember[] faultyMembers = ce.getFaultyMembers(); for (FaultyMember faultyMember : faultyMembers) { memberDisappeared(faultyMember.getMember()); } throw ce; } } //update our map of members, expire some if we didn't receive a ping back synchronized (mapMembers) { Member[] members = mapMembers.keySet().toArray(new Member[mapMembers.size()]); long now = System.currentTimeMillis(); for (Member member : members) { long access = mapMembers.get(member); if ( (now - access) > timeout ) { memberDisappeared(member); } } }//synch }
/** * Sends a ping out to all the members in the cluster, not just map members * that this map is alive. * @param timeout long * @throws ChannelException */ protected void ping(long timeout) throws ChannelException { //send out a map membership message, only wait for the first reply MapMessage msg = new MapMessage(this.mapContextName, MapMessage.MSG_INIT, false, null, null, null, channel.getLocalMember(false), null); if ( channel.getMembers().length > 0 ) { try { //send a ping, wait for all nodes to reply Response[] resp = rpcChannel.send(channel.getMembers(), msg, RpcChannel.ALL_REPLY, (channelSendOptions), (int) accessTimeout); for (int i = 0; i < resp.length; i++) { memberAlive(resp[i].getSource()); } } catch (ChannelException ce) { // Handle known failed members FaultyMember[] faultyMembers = ce.getFaultyMembers(); for (FaultyMember faultyMember : faultyMembers) { memberDisappeared(faultyMember.getMember()); } throw ce; } } //update our map of members, expire some if we didn't receive a ping back synchronized (mapMembers) { Member[] members = mapMembers.keySet().toArray(new Member[mapMembers.size()]); long now = System.currentTimeMillis(); for (Member member : members) { long access = mapMembers.get(member).longValue(); if ( (now - access) > timeout ) { memberDisappeared(member); } } }//synch }
/** * Sends a ping out to all the members in the cluster, not just map members * that this map is alive. * @param timeout long * @throws ChannelException */ protected void ping(long timeout) throws ChannelException { //send out a map membership message, only wait for the first reply MapMessage msg = new MapMessage(this.mapContextName, MapMessage.MSG_INIT, false, null, null, null, channel.getLocalMember(false), null); if ( channel.getMembers().length > 0 ) { try { //send a ping, wait for all nodes to reply Response[] resp = rpcChannel.send(channel.getMembers(), msg, RpcChannel.ALL_REPLY, (channelSendOptions), (int) accessTimeout); for (int i = 0; i < resp.length; i++) { memberAlive(resp[i].getSource()); } } catch (ChannelException ce) { // Handle known failed members FaultyMember[] faultyMembers = ce.getFaultyMembers(); for (FaultyMember faultyMember : faultyMembers) { memberDisappeared(faultyMember.getMember()); } } } //update our map of members, expire some if we didn't receive a ping back synchronized (mapMembers) { Iterator<Map.Entry<Member, Long>> it = mapMembers.entrySet().iterator(); long now = System.currentTimeMillis(); while ( it.hasNext() ) { Map.Entry<Member,Long> entry = it.next(); long access = entry.getValue().longValue(); if ( (now - access) > timeout ) { it.remove(); memberDisappeared(entry.getKey()); } } }//synch }