private static void invokeAsynchronously( Object impl, ThriftFunction func, TBase<?, ?> args, DefaultRpcResponse reply) throws TException { final AsyncProcessFunction<Object, TBase<?, ?>, Object> f = func.asyncFunc(); f.start(impl, args, new AsyncMethodCallback<Object>() { @Override public void onComplete(Object response) { if (func.isOneWay()) { reply.complete(null); } else { reply.complete(response); } } @Override public void onError(Exception e) { reply.completeExceptionally(e); } }); }
@SuppressWarnings("rawtypes") private void registerFunction(Set<String> methodNames, Class<?> iface, String name, Object func) { checkDuplicateMethodName(methodNames, name); methodNames.add(name); try { final ThriftFunction f; if (func instanceof ProcessFunction) { f = new ThriftFunction(iface, (ProcessFunction) func); } else { f = new ThriftFunction(iface, (AsyncProcessFunction) func); } functions.put(name, f); } catch (Exception e) { throw new IllegalArgumentException("failed to retrieve function metadata: " + iface.getName() + '.' + name + "()", e); } }
private Set<Class<?>> init(Object implementation, Iterable<Class<?>> candidateInterfaces) { // Build the map of method names and their corresponding process functions. final Set<String> methodNames = new HashSet<>(); final Set<Class<?>> interfaces = new HashSet<>(); for (Class<?> iface : candidateInterfaces) { final Map<String, AsyncProcessFunction<?, ?, ?>> asyncProcessMap; asyncProcessMap = getThriftAsyncProcessMap(implementation, iface); if (asyncProcessMap != null) { asyncProcessMap.forEach( (name, func) -> registerFunction(methodNames, iface, name, func)); interfaces.add(iface); } final Map<String, ProcessFunction<?, ?>> processMap; processMap = getThriftProcessMap(implementation, iface); if (processMap != null) { processMap.forEach( (name, func) -> registerFunction(methodNames, iface, name, func)); interfaces.add(iface); } } if (functions.isEmpty()) { if (implementation != null) { throw new IllegalArgumentException('\'' + implementation.getClass().getName() + "' is not a Thrift service implementation."); } else { throw new IllegalArgumentException("not a Thrift service interface: " + candidateInterfaces); } } return Collections.unmodifiableSet(interfaces); }
private static Map<String, AsyncProcessFunction<?, ?, ?>> getThriftAsyncProcessMap( Object service, Class<?> iface) { final String name = iface.getName(); if (!name.endsWith("$AsyncIface")) { return null; } final String processorName = name.substring(0, name.length() - 10) + "AsyncProcessor"; try { Class<?> processorClass = Class.forName(processorName, false, iface.getClassLoader()); if (!TBaseAsyncProcessor.class.isAssignableFrom(processorClass)) { return null; } final Constructor<?> processorConstructor = processorClass.getConstructor(iface); @SuppressWarnings("rawtypes") final TBaseAsyncProcessor processor = (TBaseAsyncProcessor) processorConstructor.newInstance(service); @SuppressWarnings("unchecked") Map<String, AsyncProcessFunction<?, ?, ?>> processMap = (Map<String, AsyncProcessFunction<?, ?, ?>>) processor.getProcessMapView(); return processMap; } catch (Exception e) { logger.debug("Failed to retrieve the asynchronous process map from:: {}", iface, e); return null; } }
@SuppressWarnings("rawtypes") public AsyncServiceProcessor(AsyncService service) { super(service, getProcessMap(new HashMap<String, AsyncProcessFunction<AsyncService, ? extends TBase, ?>>())); }
@SuppressWarnings("rawtypes") private static Map<String, AsyncProcessFunction<AsyncService, ? extends TBase, ?>> getProcessMap( Map<String, AsyncProcessFunction<AsyncService, ? extends TBase, ?>> processMap) { processMap.put(FUNCTION_NAME, new GetAsyncProcessFunction()); return processMap; }
ThriftFunction(Class<?> serviceType, AsyncProcessFunction<?, ?, ?> func) throws Exception { this(serviceType, func.getMethodName(), func, Type.ASYNC, getArgFields(func), getResult(func), getDeclaredExceptions(func)); }
private static TBase<?, ?> getResult(AsyncProcessFunction<?, ?, ?> asyncFunc) { return getResult0(Type.ASYNC, asyncFunc.getClass(), asyncFunc.getMethodName()); }
private static TBase<?, ?> getArgs(AsyncProcessFunction<?, ?, ?> asyncFunc) { return getArgs0(Type.ASYNC, asyncFunc.getClass(), asyncFunc.getMethodName()); }
private static TFieldIdEnum[] getArgFields(AsyncProcessFunction<?, ?, ?> asyncFunc) { return getArgFields0(Type.ASYNC, asyncFunc.getClass(), asyncFunc.getMethodName()); }
private static Class<?>[] getDeclaredExceptions(AsyncProcessFunction<?, ?, ?> asyncFunc) { return getDeclaredExceptions0(Type.ASYNC, asyncFunc.getClass(), asyncFunc.getMethodName()); }
/** * Returns the {@link AsyncProcessFunction}. * * @throws ClassCastException if this function is synchronous */ @SuppressWarnings("unchecked") public AsyncProcessFunction<Object, TBase<?, ?>, Object> asyncFunc() { return (AsyncProcessFunction<Object, TBase<?, ?>, Object>) func; }