/** * Adds an interceptor to the stack for message processing<br> * Interceptors are ordered in the way they are added.<br> * <code>channel.addInterceptor(A);</code><br> * <code>channel.addInterceptor(C);</code><br> * <code>channel.addInterceptor(B);</code><br> * Will result in a interceptor stack like this:<br> * <code>A -> C -> B</code><br> * The complete stack will look like this:<br> * <code>Channel -> A -> C -> B -> ChannelCoordinator</code><br> * @param interceptor ChannelInterceptorBase */ @Override public void addInterceptor(ChannelInterceptor interceptor) { if ( interceptors == null ) { interceptors = interceptor; interceptors.setNext(coordinator); interceptors.setPrevious(null); coordinator.setPrevious(interceptors); } else { ChannelInterceptor last = interceptors; while ( last.getNext() != coordinator ) { last = last.getNext(); } last.setNext(interceptor); interceptor.setNext(coordinator); interceptor.setPrevious(last); coordinator.setPrevious(interceptor); } }
@Override public synchronized void start(int svc) throws ChannelException { super.start(svc); running = true; if ( thread == null && useThread) { thread = new PingThread(); thread.setDaemon(true); String channelName = ""; if (getChannel() instanceof GroupChannel && ((GroupChannel)getChannel()).getName() != null) { channelName = "[" + ((GroupChannel)getChannel()).getName() + "]"; } thread.setName("TcpPingInterceptor.PingThread" + channelName +"-"+cnt.addAndGet(1)); thread.start(); } //acquire the interceptors to invoke on send ping events ChannelInterceptor next = getNext(); while ( next != null ) { if ( next instanceof TcpFailureDetector ) failureDetector = new WeakReference<TcpFailureDetector>((TcpFailureDetector)next); if ( next instanceof StaticMembershipInterceptor ) staticMembers = new WeakReference<StaticMembershipInterceptor>((StaticMembershipInterceptor)next); next = next.getNext(); } }
@Test public void testOptionConflict() throws Exception { boolean error = false; channel.setOptionCheck(true); ChannelInterceptor i = new TestInterceptor(); i.setOptionFlag(128); channel.addInterceptor(i); i = new TestInterceptor(); i.setOptionFlag(128); channel.addInterceptor(i); try { channel.start(Channel.DEFAULT); }catch ( ChannelException x ) { if ( x.getMessage().indexOf("option flag conflict") >= 0 ) error = true; } assertTrue(error); }
@Test public void testOptionNoConflict() throws Exception { boolean error = false; channel.setOptionCheck(true); ChannelInterceptor i = new TestInterceptor(); i.setOptionFlag(128); channel.addInterceptor(i); i = new TestInterceptor(); i.setOptionFlag(64); channel.addInterceptor(i); i = new TestInterceptor(); i.setOptionFlag(256); channel.addInterceptor(i); try { channel.start(Channel.DEFAULT); }catch ( ChannelException x ) { if ( x.getMessage().indexOf("option flag conflict") >= 0 ) error = true; } assertFalse(error); }
/** * Adds an interceptor to the stack for message processing<br> * Interceptors are ordered in the way they are added.<br> * <code>channel.addInterceptor(A);</code><br> * <code>channel.addInterceptor(C);</code><br> * <code>channel.addInterceptor(B);</code><br> * Will result in a interceptor stack like this:<br> * <code>A -> C -> B</code><br> * The complete stack will look like this:<br> * <code>Channel -> A -> C -> B -> ChannelCoordinator</code><br> * * @param interceptor * ChannelInterceptorBase */ @Override public void addInterceptor(ChannelInterceptor interceptor) { if (interceptors == null) { interceptors = interceptor; interceptors.setNext(coordinator); interceptors.setPrevious(null); coordinator.setPrevious(interceptors); } else { ChannelInterceptor last = interceptors; while (last.getNext() != coordinator) { last = last.getNext(); } last.setNext(interceptor); interceptor.setNext(coordinator); interceptor.setPrevious(last); coordinator.setPrevious(interceptor); } }
@Override public synchronized void start(int svc) throws ChannelException { super.start(svc); running = true; if (thread == null && useThread) { thread = new PingThread(); thread.setDaemon(true); String channelName = ""; if (getChannel() instanceof GroupChannel && ((GroupChannel) getChannel()).getName() != null) { channelName = "[" + ((GroupChannel) getChannel()).getName() + "]"; } thread.setName("TcpPingInterceptor.PingThread" + channelName + "-" + cnt.addAndGet(1)); thread.start(); } // acquire the interceptors to invoke on send ping events ChannelInterceptor next = getNext(); while (next != null) { if (next instanceof TcpFailureDetector) failureDetector = new WeakReference<TcpFailureDetector>((TcpFailureDetector) next); if (next instanceof StaticMembershipInterceptor) staticMembers = new WeakReference<StaticMembershipInterceptor>((StaticMembershipInterceptor) next); next = next.getNext(); } }
@Override public synchronized void start(int svc) throws ChannelException { super.start(svc); running = true; if ( thread == null && useThread) { thread = new PingThread(); thread.setDaemon(true); thread.setName("TcpPingInterceptor.PingThread-"+cnt.addAndGet(1)); thread.start(); } //acquire the interceptors to invoke on send ping events ChannelInterceptor next = getNext(); while ( next != null ) { if ( next instanceof TcpFailureDetector ) failureDetector = new WeakReference<TcpFailureDetector>((TcpFailureDetector)next); if ( next instanceof StaticMembershipInterceptor ) staticMembers = new WeakReference<StaticMembershipInterceptor>((StaticMembershipInterceptor)next); next = next.getNext(); } }
@Override public synchronized void start(int svc) throws ChannelException { super.start(svc); running = true; if ( thread == null ) { thread = new PingThread(); thread.setDaemon(true); thread.setName("TcpPingInterceptor.PingThread-"+cnt.addAndGet(1)); thread.start(); } //acquire the interceptors to invoke on send ping events ChannelInterceptor next = getNext(); while ( next != null ) { if ( next instanceof TcpFailureDetector ) failureDetector = new WeakReference<TcpFailureDetector>((TcpFailureDetector)next); if ( next instanceof StaticMembershipInterceptor ) staticMembers = new WeakReference<StaticMembershipInterceptor>((StaticMembershipInterceptor)next); next = next.getNext(); } }
/** * Validates the option flags that each interceptor is using and reports * an error if two interceptor share the same flag. * @throws ChannelException */ protected void checkOptionFlags() throws ChannelException { StringBuilder conflicts = new StringBuilder(); ChannelInterceptor first = interceptors; while ( first != null ) { int flag = first.getOptionFlag(); if ( flag != 0 ) { ChannelInterceptor next = first.getNext(); while ( next != null ) { int nflag = next.getOptionFlag(); if (nflag!=0 && (((flag & nflag) == flag ) || ((flag & nflag) == nflag)) ) { conflicts.append("["); conflicts.append(first.getClass().getName()); conflicts.append(":"); conflicts.append(flag); conflicts.append(" == "); conflicts.append(next.getClass().getName()); conflicts.append(":"); conflicts.append(nflag); conflicts.append("] "); }//end if next = next.getNext(); }//while }//end if first = first.getNext(); }//while if ( conflicts.length() > 0 ) throw new ChannelException("Interceptor option flag conflict: "+conflicts.toString()); }
@Override public ChannelInterceptor next() { ChannelInterceptor result = null; if ( hasNext() ) { result = start; start = start.getNext(); } return result; }
public CoordinationEvent(int type,ChannelInterceptor interceptor, String info) { this.type = type; this.interceptor = interceptor; this.coord = ((NonBlockingCoordinator)interceptor).getCoordinator(); this.mbrs = ((NonBlockingCoordinator)interceptor).membership.getMembers(); this.info = info; this.view = ((NonBlockingCoordinator)interceptor).view; this.suggestedView = ((NonBlockingCoordinator)interceptor).suggestedView; }
/** * Send notifications upwards * @param svc int * @throws ChannelException */ @Override public void start(int svc) throws ChannelException { if ( (Channel.SND_RX_SEQ&svc)==Channel.SND_RX_SEQ ) super.start(Channel.SND_RX_SEQ); if ( (Channel.SND_TX_SEQ&svc)==Channel.SND_TX_SEQ ) super.start(Channel.SND_TX_SEQ); final ChannelInterceptorBase base = this; for (final Member member : members) { Thread t = new Thread() { @Override public void run() { base.memberAdded(member); if (getfirstInterceptor().getMember(member) != null) { sendLocalMember(new Member[]{member}); } } }; t.start(); } super.start(svc & (~Channel.SND_RX_SEQ) & (~Channel.SND_TX_SEQ)); // check required interceptors TcpFailureDetector failureDetector = null; TcpPingInterceptor pingInterceptor = null; ChannelInterceptor prev = getPrevious(); while (prev != null) { if (prev instanceof TcpFailureDetector ) failureDetector = (TcpFailureDetector) prev; if (prev instanceof TcpPingInterceptor) pingInterceptor = (TcpPingInterceptor) prev; prev = prev.getPrevious(); } if (failureDetector == null) { log.warn("There is no TcpFailureDetector. Automatic detection of static members does" + " not work properly. By defining the StaticMembershipInterceptor under the" + " TcpFailureDetector, automatic detection of the static members will work."); } if (pingInterceptor == null) { log.warn("There is no TcpPingInterceptor. The health check of static member does" + " not work properly. By defining the TcpPingInterceptor, the health check of" + " static member will work."); } }
protected ChannelInterceptor getfirstInterceptor() { ChannelInterceptor result = null; ChannelInterceptor now = this; do { result = now; now = now.getPrevious(); } while (now.getPrevious() != null); return result; }
/** * Validates the option flags that each interceptor is using and reports an * error if two interceptor share the same flag. * * @throws ChannelException */ protected void checkOptionFlags() throws ChannelException { StringBuilder conflicts = new StringBuilder(); ChannelInterceptor first = interceptors; while (first != null) { int flag = first.getOptionFlag(); if (flag != 0) { ChannelInterceptor next = first.getNext(); while (next != null) { int nflag = next.getOptionFlag(); if (nflag != 0 && (((flag & nflag) == flag) || ((flag & nflag) == nflag))) { conflicts.append("["); conflicts.append(first.getClass().getName()); conflicts.append(":"); conflicts.append(flag); conflicts.append(" == "); conflicts.append(next.getClass().getName()); conflicts.append(":"); conflicts.append(nflag); conflicts.append("] "); } // end if next = next.getNext(); } // while } // end if first = first.getNext(); } // while if (conflicts.length() > 0) throw new ChannelException("Interceptor option flag conflict: " + conflicts.toString()); }
/** * Returns the first interceptor of the stack. Useful for traversal. * * @return ChannelInterceptor */ public ChannelInterceptor getFirstInterceptor() { if (interceptors != null) return interceptors; else return coordinator; }
@Override public ChannelInterceptor next() { ChannelInterceptor result = null; if (hasNext()) { result = start; start = start.getNext(); } return result; }
public CoordinationEvent(int type, ChannelInterceptor interceptor, String info) { this.type = type; this.interceptor = interceptor; this.coord = ((NonBlockingCoordinator) interceptor).getCoordinator(); this.mbrs = ((NonBlockingCoordinator) interceptor).membership.getMembers(); this.info = info; this.view = ((NonBlockingCoordinator) interceptor).view; this.suggestedView = ((NonBlockingCoordinator) interceptor).suggestedView; }