@Override public void verifyServerTraces(PluginTestVerifier verifier) throws Exception { final InetSocketAddress actualServerAddress = super.environment.getServerAddress(); verifier.verifyTraceCount(2); Method process = TBaseProcessor.class.getDeclaredMethod("process", TProtocol.class, TProtocol.class); verifier.verifyDiscreteTrace( // RootSpan - Thrift Server Invocation root("THRIFT_SERVER", // ServiceType, "Thrift Server Invocation", // Method "com/navercorp/pinpoint/plugin/thrift/dto/EchoService/echo", // rpc actualServerAddress.getHostName() + ":" + actualServerAddress.getPort(), // endPoint actualServerAddress.getHostName()), // remoteAddress // SpanEvent - TBaseProcessor.process event("THRIFT_SERVER_INTERNAL", process)); verifier.verifyTraceCount(0); }
private String getMethodUri(Object target) { String methodUri = ThriftConstants.UNKNOWN_METHOD_URI; InterceptorScopeInvocation currentTransaction = this.scope.getCurrentInvocation(); Object attachment = currentTransaction.getAttachment(); if (attachment instanceof ThriftClientCallContext && target instanceof TBaseProcessor) { ThriftClientCallContext clientCallContext = (ThriftClientCallContext)attachment; String methodName = clientCallContext.getMethodName(); methodUri = ThriftUtils.getProcessorNameAsUri((TBaseProcessor<?>)target); StringBuilder sb = new StringBuilder(methodUri); if (!methodUri.endsWith("/")) { sb.append("/"); } sb.append(methodName); methodUri = sb.toString(); } return methodUri; }
public static void main(String[] args) throws Exception { int port = 8083; TBaseProcessor<?> processor = new TCalculator.Processor<TCalculator.Iface>(new CalcIfaceImpl()); ThriftServerDef serverDef = ThriftServerDef.newBuilder().listen(port)// .withProcessor(processor)// .using(Executors.newCachedThreadPool())// .clientIdleTimeout(TimeUnit.SECONDS.toMillis(15)).build(); final ServerBootstrap server = new ServerBootstrap(serverDef); // 启动 Server server.start(); }
private static Map<String, ProcessFunction<?, ?>> getThriftProcessMap(Object service, Class<?> iface) { final String name = iface.getName(); if (!name.endsWith("$Iface")) { return null; } final String processorName = name.substring(0, name.length() - 5) + "Processor"; try { final Class<?> processorClass = Class.forName(processorName, false, iface.getClassLoader()); if (!TBaseProcessor.class.isAssignableFrom(processorClass)) { return null; } final Constructor<?> processorConstructor = processorClass.getConstructor(iface); @SuppressWarnings("rawtypes") final TBaseProcessor processor = (TBaseProcessor) processorConstructor.newInstance(service); @SuppressWarnings("unchecked") Map<String, ProcessFunction<?, ?>> processMap = (Map<String, ProcessFunction<?, ?>>) processor.getProcessMapView(); return processMap; } catch (Exception e) { logger.debug("Failed to retrieve the process map from: {}", iface, e); return null; } }
@JsonIgnore public TBaseProcessor getProcessor() { return this.processor; }
@SuppressWarnings("unchecked") public T withProcessor(@SuppressWarnings("rawtypes") TBaseProcessor processor) { this.processor = processor; return (T) this; }
@Override public TProcessor create(HttpServletRequest request) { return new TBaseProcessor<Hawk.Iface>( new HawkThriftIface(protocol, request, artemisServer), processMap) {}; }
/** * Returns the name of the specified {@link org.apache.thrift.TBaseProcessor TBaseProcessor} * as uri to be used in Pinpoint. */ public static String getProcessorNameAsUri(TBaseProcessor<?> processor) { String actualProcessorName = processor.getClass().getName(); return convertDotPathToUriPath(ThriftConstants.PROCESSOR_PATTERN.matcher(actualProcessorName).replaceAll(".")); }