@Override public void memberAdded(Member member) { if ( membership == null ) setupMembership(); boolean notify = false; synchronized (membership) { if (removeSuspects.containsKey(member)) { //previously marked suspect, system below picked up the member again removeSuspects.remove(member); } else if (membership.getMember(member) == null){ //if we add it here, then add it upwards too //check to see if it is alive if (memberAlive(member)) { membership.memberAlive( (MemberImpl) member); notify = true; } else { addSuspects.put(member, Long.valueOf(System.currentTimeMillis())); } } } if ( notify ) super.memberAdded(member); }
protected void performForcedCheck() { //update all alive times Member[] members = super.getMembers(); for (int i = 0; members != null && i < members.length; i++) { if (memberAlive(members[i])) { if (membership.memberAlive((MemberImpl)members[i])) super.memberAdded(members[i]); addSuspects.remove(members[i]); } else { if (membership.getMember(members[i])!=null) { membership.removeMember((MemberImpl)members[i]); removeSuspects.remove(members[i]); if (members[i] instanceof StaticMember) { addSuspects.put(members[i], Long.valueOf(System.currentTimeMillis())); } super.memberDisappeared(members[i]); } } //end if } //for }
protected void sendElectionMsgToNextInline(MemberImpl local, CoordinationMessage msg) throws ChannelException { int next = Arrays.nextIndex(local,msg.getMembers()); int current = next; msg.leader = msg.getMembers()[0]; boolean sent = false; while ( !sent && current >= 0 ) { try { sendElectionMsg(local, msg.getMembers()[current], msg); sent = true; }catch ( ChannelException x ) { log.warn("Unable to send election message to:"+msg.getMembers()[current]); current = Arrays.nextIndex(msg.getMembers()[current],msg.getMembers()); if ( current == next ) throw x; } } }
protected void handleMyToken(MemberImpl local, CoordinationMessage msg, Member sender,Membership merged) throws ChannelException { if ( local.equals(msg.getLeader()) ) { //no leadership change if ( Arrays.sameMembers(msg.getMembers(),merged.getMembers()) ) { msg.type = COORD_CONF; super.sendMessage(Arrays.remove(msg.getMembers(),local),createData(msg,local),null); handleViewConf(msg,local,merged); } else { //membership change suggestedView = new Membership(local,AbsoluteOrder.comp,true); suggestedviewId = msg.getId(); Arrays.fill(suggestedView,merged.getMembers()); msg.view = merged.getMembers(); sendElectionMsgToNextInline(local,msg); } } else { //leadership change suggestedView = null; suggestedviewId = null; msg.view = merged.getMembers(); sendElectionMsgToNextInline(local,msg); } }
protected void handleViewConf(CoordinationMessage msg, Member sender,Membership merged) throws ChannelException { if ( viewId != null && msg.getId().equals(viewId) ) return;//we already have this view view = new Membership((MemberImpl)getLocalMember(false),AbsoluteOrder.comp,true); Arrays.fill(view,msg.getMembers()); viewId = msg.getId(); if ( viewId.equals(suggestedviewId) ) { suggestedView = null; suggestedviewId = null; } if (suggestedView != null && AbsoluteOrder.comp.compare(suggestedView.getMembers()[0],merged.getMembers()[0])<0 ) { suggestedView = null; suggestedviewId = null; } viewChange(viewId,view.getMembers()); fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_CONF_RX,this,"Accepted View")); if ( suggestedviewId == null && hasHigherPriority(merged.getMembers(),membership.getMembers()) ) { startElection(false); } }
@Override public void memberDisappeared(Member member) { try { membership.removeMember((MemberImpl)member); super.memberDisappeared(member); try { fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MBR_DEL,this,"Member remove("+member.getName()+")")); if ( started && (isCoordinator() || isHighest()) ) startElection(true); //to do, if a member disappears, only the coordinator can start }catch ( ChannelException x ) { log.error("Unable to start election when member was removed.",x); } }finally { } }
@Override public void heartbeat() { try { MemberImpl local = (MemberImpl)getLocalMember(false); if ( view != null && (Arrays.diff(view,membership,local).length != 0 || Arrays.diff(membership,view,local).length != 0) ) { if ( isHighest() ) { fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START_ELECT, this, "Heartbeat found inconsistency, restart election")); startElection(true); } } } catch ( Exception x ){ log.error("Unable to perform heartbeat.",x); } finally { super.heartbeat(); } }
public byte[] getDataPackage(byte[] data, int offset) { byte[] addr = ((MemberImpl)address).getData(false); XByteBuffer.toBytes(options,data,offset); offset += 4; //options XByteBuffer.toBytes(timestamp,data,offset); offset += 8; //timestamp XByteBuffer.toBytes(uniqueId.length,data,offset); offset += 4; //uniqueId.length System.arraycopy(uniqueId,0,data,offset,uniqueId.length); offset += uniqueId.length; //uniqueId data XByteBuffer.toBytes(addr.length,data,offset); offset += 4; //addr.length System.arraycopy(addr,0,data,offset,addr.length); offset += addr.length; //addr data XByteBuffer.toBytes(message.getLength(),data,offset); offset += 4; //message.length System.arraycopy(message.getBytesDirect(),0,data,offset,message.getLength()); offset += message.getLength(); //message data return data; }
public static ChannelData getDataFromPackage(byte[] b) { ChannelData data = new ChannelData(false); int offset = 0; data.setOptions(XByteBuffer.toInt(b,offset)); offset += 4; //options data.setTimestamp(XByteBuffer.toLong(b,offset)); offset += 8; //timestamp data.uniqueId = new byte[XByteBuffer.toInt(b,offset)]; offset += 4; //uniqueId length System.arraycopy(b,offset,data.uniqueId,0,data.uniqueId.length); offset += data.uniqueId.length; //uniqueId data byte[] addr = new byte[XByteBuffer.toInt(b,offset)]; offset += 4; //addr length System.arraycopy(b,offset,addr,0,addr.length); data.setAddress(MemberImpl.getMember(addr)); offset += addr.length; //addr data int xsize = XByteBuffer.toInt(b,offset); //data.message = new XByteBuffer(new byte[xsize],false); data.message = BufferPool.getBufferPool().getBuffer(xsize,false); offset += 4; //message length System.arraycopy(b,offset,data.message.getBytesDirect(),0,xsize); data.message.append(b,offset,xsize); offset += xsize; //message data return data; }
@Override public void memberAdded(Member member) { if (membership == null) setupMembership(); boolean notify = false; synchronized (membership) { if (removeSuspects.containsKey(member)) { // previously marked suspect, system below picked up the member // again removeSuspects.remove(member); } else if (membership.getMember(member) == null) { // if we add it here, then add it upwards too // check to see if it is alive if (memberAlive(member)) { membership.memberAlive((MemberImpl) member); notify = true; } else { addSuspects.put(member, Long.valueOf(System.currentTimeMillis())); } } } if (notify) super.memberAdded(member); }
protected void performForcedCheck() { // update all alive times Member[] members = super.getMembers(); for (int i = 0; members != null && i < members.length; i++) { if (memberAlive(members[i])) { if (membership.memberAlive((MemberImpl) members[i])) super.memberAdded(members[i]); addSuspects.remove(members[i]); } else { if (membership.getMember(members[i]) != null) { membership.removeMember((MemberImpl) members[i]); removeSuspects.remove(members[i]); if (members[i] instanceof StaticMember) { addSuspects.put(members[i], Long.valueOf(System.currentTimeMillis())); } super.memberDisappeared(members[i]); } } // end if } // for }
public static ChannelData getDataFromPackage(byte[] b) { ChannelData data = new ChannelData(false); int offset = 0; data.setOptions(XByteBuffer.toInt(b, offset)); offset += 4; // options data.setTimestamp(XByteBuffer.toLong(b, offset)); offset += 8; // timestamp data.uniqueId = new byte[XByteBuffer.toInt(b, offset)]; offset += 4; // uniqueId length System.arraycopy(b, offset, data.uniqueId, 0, data.uniqueId.length); offset += data.uniqueId.length; // uniqueId data byte[] addr = new byte[XByteBuffer.toInt(b, offset)]; offset += 4; // addr length System.arraycopy(b, offset, addr, 0, addr.length); data.setAddress(MemberImpl.getMember(addr)); offset += addr.length; // addr data int xsize = XByteBuffer.toInt(b, offset); // data.message = new XByteBuffer(new byte[xsize],false); data.message = BufferPool.getBufferPool().getBuffer(xsize, false); offset += 4; // message length System.arraycopy(b, offset, data.message.getBytesDirect(), 0, xsize); data.message.append(b, offset, xsize); offset += xsize; // message data return data; }
protected void sendElectionMsgToNextInline(MemberImpl local, CoordinationMessage msg) throws ChannelException { int next = Arrays.nextIndex(local, msg.getMembers()); int current = next; msg.leader = msg.getMembers()[0]; boolean sent = false; while (!sent && current >= 0) { try { sendElectionMsg(local, msg.getMembers()[current], msg); sent = true; } catch (ChannelException x) { log.warn("Unable to send election message to:" + msg.getMembers()[current]); current = Arrays.nextIndex(msg.getMembers()[current], msg.getMembers()); if (current == next) throw x; } } }
protected Membership mergeOnArrive(CoordinationMessage msg, Member sender) { fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_PRE_MERGE, this, "Pre merge")); MemberImpl local = (MemberImpl) getLocalMember(false); Membership merged = new Membership(local, AbsoluteOrder.comp, true); Arrays.fill(merged, msg.getMembers()); Arrays.fill(merged, getMembers()); Member[] diff = Arrays.diff(merged, membership, local); for (int i = 0; i < diff.length; i++) { if (!alive(diff[i])) merged.removeMember((MemberImpl) diff[i]); else memberAdded(diff[i], false); } fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_POST_MERGE, this, "Post merge")); return merged; }
protected void handleMyToken(MemberImpl local, CoordinationMessage msg, Member sender, Membership merged) throws ChannelException { if (local.equals(msg.getLeader())) { // no leadership change if (Arrays.sameMembers(msg.getMembers(), merged.getMembers())) { msg.type = COORD_CONF; super.sendMessage(Arrays.remove(msg.getMembers(), local), createData(msg, local), null); handleViewConf(msg, local, merged); } else { // membership change suggestedView = new Membership(local, AbsoluteOrder.comp, true); suggestedviewId = msg.getId(); Arrays.fill(suggestedView, merged.getMembers()); msg.view = merged.getMembers(); sendElectionMsgToNextInline(local, msg); } } else { // leadership change suggestedView = null; suggestedviewId = null; msg.view = merged.getMembers(); sendElectionMsgToNextInline(local, msg); } }
protected void handleViewConf(CoordinationMessage msg, Member sender, Membership merged) throws ChannelException { if (viewId != null && msg.getId().equals(viewId)) return;// we already have this view view = new Membership((MemberImpl) getLocalMember(false), AbsoluteOrder.comp, true); Arrays.fill(view, msg.getMembers()); viewId = msg.getId(); if (viewId.equals(suggestedviewId)) { suggestedView = null; suggestedviewId = null; } if (suggestedView != null && AbsoluteOrder.comp.compare(suggestedView.getMembers()[0], merged.getMembers()[0]) < 0) { suggestedView = null; suggestedviewId = null; } viewChange(viewId, view.getMembers()); fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_CONF_RX, this, "Accepted View")); if (suggestedviewId == null && hasHigherPriority(merged.getMembers(), membership.getMembers())) { startElection(false); } }
public void memberAdded(Member member, boolean elect) { try { if (membership == null) setupMembership(); if (membership.memberAlive((MemberImpl) member)) super.memberAdded(member); try { fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MBR_ADD, this, "Member add(" + member.getName() + ")")); if (started && elect) startElection(false); } catch (ChannelException x) { log.error("Unable to start election when member was added.", x); } } finally { } }
@Override public void memberDisappeared(Member member) { try { membership.removeMember((MemberImpl) member); super.memberDisappeared(member); try { fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MBR_DEL, this, "Member remove(" + member.getName() + ")")); if (started && (isCoordinator() || isHighest())) startElection(true); // to do, if a member disappears, only // the coordinator can start } catch (ChannelException x) { log.error("Unable to start election when member was removed.", x); } } finally { } }
@Override public void heartbeat() { try { MemberImpl local = (MemberImpl) getLocalMember(false); if (view != null && (Arrays.diff(view, membership, local).length != 0 || Arrays.diff(membership, view, local).length != 0)) { if (isHighest()) { fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START_ELECT, this, "Heartbeat found inconsistency, restart election")); startElection(true); } } } catch (Exception x) { log.error("Unable to perform heartbeat.", x); } finally { super.heartbeat(); } }
@Override public void memberAdded(Member member) { if (membership == null) setupMembership(); boolean notify = false; synchronized (membership) { notify = Arrays.equals(domain, member.getDomain()); if (notify) notify = membership.memberAlive((MemberImpl) member); } if (notify) { super.memberAdded(member); } else { if (log.isInfoEnabled()) log.info("Member was refused to join cluster[" + member + "]"); } }
public byte[] getDataPackage(byte[] data, int offset) { byte[] addr = ((MemberImpl) address).getData(false); XByteBuffer.toBytes(options, data, offset); offset += 4; // options XByteBuffer.toBytes(timestamp, data, offset); offset += 8; // timestamp XByteBuffer.toBytes(uniqueId.length, data, offset); offset += 4; // uniqueId.length System.arraycopy(uniqueId, 0, data, offset, uniqueId.length); offset += uniqueId.length; // uniqueId data XByteBuffer.toBytes(addr.length, data, offset); offset += 4; // addr.length System.arraycopy(addr, 0, data, offset, addr.length); offset += addr.length; // addr data XByteBuffer.toBytes(message.getLength(), data, offset); offset += 4; // message.length System.arraycopy(message.getBytesDirect(), 0, data, offset, message.getLength()); offset += message.getLength(); // message data return data; }
/** * @deprecated Unused - will be removed in 8.0.x */ @Deprecated protected Member[] readMembers(ObjectInput in) throws IOException { int nodecount = in.readInt(); Member[] members = new Member[nodecount]; for ( int i=0; i<members.length; i++ ) { byte[] d = new byte[in.readInt()]; in.readFully(d); if (d.length > 0) members[i] = MemberImpl.getMember(d); } return members; }
/** * @deprecated Unused - will be removed in 8.0.x */ @Deprecated protected void writeMembers(ObjectOutput out,Member[] members) throws IOException { if ( members == null ) members = new Member[0]; out.writeInt(members.length); for (int i=0; i<members.length; i++ ) { if ( members[i] != null ) { byte[] d = members[i] != null ? ( (MemberImpl)members[i]).getData(false) : new byte[0]; out.writeInt(d.length); out.write(d); } } }
private CoordinationMessage createElectionMsg(MemberImpl local, MemberImpl[] others, MemberImpl leader) { Membership m = new Membership(local,AbsoluteOrder.comp,true); Arrays.fill(m,others); MemberImpl[] mbrs = m.getMembers(); m.reset(); CoordinationMessage msg = new CoordinationMessage(leader, local, mbrs,new UniqueId(UUIDGenerator.randomUUID(true)), COORD_REQUEST); return msg; }
public Member getNextInLine(MemberImpl local, MemberImpl[] others) { MemberImpl result = null; for ( int i=0; i<others.length; i++ ) { } return result; }
public ChannelData createData(CoordinationMessage msg, MemberImpl local) { msg.write(); ChannelData data = new ChannelData(true); data.setAddress(local); data.setMessage(msg.getBuffer()); data.setOptions(Channel.SEND_OPTIONS_USE_ACK); data.setTimestamp(System.currentTimeMillis()); return data; }
protected Membership mergeOnArrive(CoordinationMessage msg, Member sender) { fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_PRE_MERGE,this,"Pre merge")); MemberImpl local = (MemberImpl)getLocalMember(false); Membership merged = new Membership(local,AbsoluteOrder.comp,true); Arrays.fill(merged,msg.getMembers()); Arrays.fill(merged,getMembers()); Member[] diff = Arrays.diff(merged,membership,local); for ( int i=0; i<diff.length; i++ ) { if (!alive(diff[i])) merged.removeMember((MemberImpl)diff[i]); else memberAdded(diff[i],false); } fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_POST_MERGE,this,"Post merge")); return merged; }
protected void handleToken(CoordinationMessage msg, Member sender,Membership merged) throws ChannelException { MemberImpl local = (MemberImpl)getLocalMember(false); if ( local.equals(msg.getSource()) ) { //my message msg.src=local handleMyToken(local, msg, sender,merged); } else { handleOtherToken(local, msg, sender,merged); } }
protected void handleOtherToken(MemberImpl local, CoordinationMessage msg, Member sender,Membership merged) throws ChannelException { if ( local.equals(msg.getLeader()) ) { //I am the new leader //startElection(false); } else { msg.view = merged.getMembers(); sendElectionMsgToNextInline(local,msg); } }
public void memberAdded(Member member,boolean elect) { try { if ( membership == null ) setupMembership(); if ( membership.memberAlive((MemberImpl)member) ) super.memberAdded(member); try { fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MBR_ADD,this,"Member add("+member.getName()+")")); if (started && elect) startElection(false); }catch ( ChannelException x ) { log.error("Unable to start election when member was added.",x); } }finally { } }
public CoordinationMessage(MemberImpl leader, MemberImpl source, MemberImpl[] view, UniqueId id, byte[] type) { this.buf = new XByteBuffer(4096,false); this.leader = leader; this.source = source; this.view = view; this.id = id; this.type = type; this.write(); }