@Override public void ack(Object msgId) { final KestrelSourceId sourceId = (KestrelSourceId) msgId; final KestrelClientInfo info = _kestrels.get(sourceId.index); // if the transaction didn't exist, it just returns false. so this code // works // even if client gets blacklisted, disconnects, and kestrel puts the // item // back on the queue try { if (info.client != null) { final HashSet<Long> xids = new HashSet<Long>(); xids.add(sourceId.id); info.client.confirm(_queueName, xids); } } catch (final TException e) { blacklist(info, e); } }
@Override public void setStormConf(String storm_conf) throws TException { LOG.info("setting configuration..."); // stop processes stopSupervisors(); stopUI(); stopNimbus(); Object json = JSONValue.parse(storm_conf); Map<?, ?> new_conf = (Map<?, ?>)json; _storm_conf.putAll(new_conf); Util.rmNulls(_storm_conf); setStormHostConf(); // start processes startNimbus(); startUI(); startSupervisors(); }
private void downloadCodeFromMaster(Assignment assignment, String topologyId) throws IOException, TException { try { String localRoot = StormConfig.masterStormdistRoot(data.getConf(), topologyId); String tmpDir = StormConfig.masterInbox(data.getConf()) + "/" + UUID.randomUUID().toString(); String masterCodeDir = assignment.getMasterCodeDir(); JStormServerUtils.downloadCodeFromMaster(data.getConf(), tmpDir, masterCodeDir, topologyId, false); FileUtils.moveDirectory(new File(tmpDir), new File(localRoot)); } catch (TException e) { // LOG.error(e + " downloadStormCode failed " + "topologyId:" + topologyId + "masterCodeDir:" + assignment.getMasterCodeDir()); throw e; } LOG.info("Finished downloading code for topology id " + topologyId + " from " + assignment.getMasterCodeDir()); }
@Override public void finishFileUpload(String location) throws TException { TimeCacheMap<Object, Object> uploaders = data.getUploaders(); Object obj = uploaders.get(location); if (obj == null) { throw new TException( "File for that location does not exist (or timed out)"); } try { if (obj instanceof WritableByteChannel) { WritableByteChannel channel = (WritableByteChannel) obj; channel.close(); uploaders.remove(location); LOG.info("Finished uploading file from client: " + location); } else { throw new TException("Object isn't WritableByteChannel for " + location); } } catch (IOException e) { LOG.error(" WritableByteChannel close failed when finishFileUpload " + location); } }
/** * get StormTopology throw deserialize local files * * @param id * String: topology id * @return StormTopology */ @Override public StormTopology getTopology(String id) throws NotAliveException, TException { StormTopology topology = null; try { StormTopology stormtopology = StormConfig .read_nimbus_topology_code(conf, id); if (stormtopology == null) { throw new TException("topology:" + id + "is null"); } Map<Object, Object> topologyConf = StormConfig .read_nimbus_topology_conf(conf, id); topology = Common.system_topology(topologyConf, stormtopology); } catch (Exception e) { LOG.error("Failed to get topology " + id + ",", e); throw new TException("Failed to get system_topology"); } return topology; }
@Override public void metricMonitor(String topologyName, MonitorOptions options) throws NotAliveException, TException { boolean isEnable = options.is_isEnable(); StormClusterState clusterState = data.getStormClusterState(); try { String topologyId = Cluster.get_topology_id(clusterState, topologyName); if (null != topologyId) { NimbusUtils.updateMetricMonitorStatus(clusterState, topologyId, isEnable); } else { throw new NotAliveException("Failed to update metricsMonitor status as " + topologyName + " is not alive"); } } catch(Exception e) { String errMsg = "Failed to update metricsMonitor " + topologyName; LOG.error(errMsg, e); throw new TException(e); } }
/** * Don't need synchronize, due to EventManager will execute serially * * @param conf * @param topologyId * @param masterCodeDir * @throws IOException * @throws TException */ private void downloadDistributeStormCode(Map conf, String topologyId, String masterCodeDir) throws IOException, TException { // STORM_LOCAL_DIR/supervisor/tmp/(UUID) String tmproot = StormConfig.supervisorTmpDir(conf) + File.separator + UUID.randomUUID().toString(); // STORM_LOCAL_DIR/supervisor/stormdist/topologyId String stormroot = StormConfig.supervisor_stormdist_root(conf, topologyId); JStormServerUtils.downloadCodeFromMaster(conf, tmproot, masterCodeDir, topologyId, true); // tmproot/stormjar.jar String localFileJarTmp = StormConfig.stormjar_path(tmproot); // extract dir from jar JStormUtils.extract_dir_from_jar(localFileJarTmp, StormConfig.RESOURCES_SUBDIR, tmproot); FileUtils.moveDirectory(new File(tmproot), new File(stormroot)); }
public Taskpage() throws TException, NotAliveException { FacesContext ctx = FacesContext.getCurrentInstance(); if (ctx.getExternalContext().getRequestParameterMap().get("clusterName") != null) { clusterName = ctx.getExternalContext() .getRequestParameterMap().get("clusterName"); } if (ctx.getExternalContext().getRequestParameterMap().get("topologyid") != null) { topologyid = ctx.getExternalContext() .getRequestParameterMap().get("topologyid"); } if (ctx.getExternalContext().getRequestParameterMap().get("taskid") != null) { taskid = ctx.getExternalContext().getRequestParameterMap() .get("taskid"); } if (topologyid == null) { throw new NotAliveException("Input topologyId is null "); } window = UIUtils.getWindow(ctx); init(); }
public SpoutPage() throws TException, NotAliveException { FacesContext ctx = FacesContext.getCurrentInstance(); if (ctx.getExternalContext().getRequestParameterMap().get("clusterName") != null) { clusterName = (String) ctx.getExternalContext() .getRequestParameterMap().get("clusterName"); } if (ctx.getExternalContext().getRequestParameterMap().get("topologyid") != null) { topologyid = (String) ctx.getExternalContext() .getRequestParameterMap().get("topologyid"); } if (ctx.getExternalContext().getRequestParameterMap() .get("componentid") != null) { componentid = (String) ctx.getExternalContext() .getRequestParameterMap().get("componentid"); } window = UIUtils.getWindow(ctx); if (topologyid == null) { throw new NotAliveException("Input topologyId is null "); } init(); }
/** * get StormTopology throw deserialize local files * * @param id * String: topology id * @return StormTopology */ @Override public StormTopology getTopology(String id) throws NotAliveException, TException { StormTopology topology = null; try { StormTopology stormtopology = StormConfig .read_nimbus_topology_code(conf, id); if (stormtopology == null) { throw new TException("topology:" + id + "is null"); } Map<Object, Object> topologyConf = (Map<Object, Object>) StormConfig .read_nimbus_topology_conf(conf, id); topology = Common.system_topology(topologyConf, stormtopology); } catch (Exception e) { LOG.error("Failed to get topology " + id + ",", e); throw new TException("Failed to get system_topology"); } return topology; }
public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException { //populating request context ReqContext req_context = ReqContext.context(); TTransport trans = inProt.getTransport(); if (trans instanceof TMemoryInputTransport) { try { req_context.setRemoteAddress(InetAddress.getLocalHost()); } catch (UnknownHostException e) { throw new RuntimeException(e); } } else if (trans instanceof TSocket) { TSocket tsocket = (TSocket)trans; //remote address Socket socket = tsocket.getSocket(); req_context.setRemoteAddress(socket.getInetAddress()); } //anonymous user req_context.setSubject(null); //invoke service handler return wrapped.process(inProt, outProt); }
/** * 创建{storm.local.dir}/nimbus/stormdist/topologyId目录 * masterCodeDir:本地 */ private void downloadCodeFromMaster(Assignment assignment, String topologyId) throws IOException, TException { try { String localRoot = StormConfig.masterStormdistRoot(data.getConf(), topologyId); String tmpDir = StormConfig.masterInbox(data.getConf()) + "/" + UUID.randomUUID().toString(); String masterCodeDir = assignment.getMasterCodeDir(); JStormServerUtils.downloadCodeFromMaster(data.getConf(), tmpDir, masterCodeDir, topologyId, false); FileUtils.moveDirectory(new File(tmpDir), new File(localRoot)); } catch (TException e) { // TODO Auto-generated catch block LOG.error(e + " downloadStormCode failed " + "topologyId:" + topologyId + "masterCodeDir:" + assignment.getMasterCodeDir()); throw e; } LOG.info("Finished downloading code for topology id " + topologyId + " from " + assignment.getMasterCodeDir()); }
public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException { // populating request context ReqContext req_context = ReqContext.context(); TTransport trans = inProt.getTransport(); // Sasl transport TSaslServerTransport saslTrans = (TSaslServerTransport) trans; // remote address TSocket tsocket = (TSocket) saslTrans.getUnderlyingTransport(); Socket socket = tsocket.getSocket(); req_context.setRemoteAddress(socket.getInetAddress()); // remote subject SaslServer saslServer = saslTrans.getSaslServer(); String authId = saslServer.getAuthorizationID(); Subject remoteUser = new Subject(); remoteUser.getPrincipals().add(new User(authId)); req_context.setSubject(remoteUser); // invoke service handler return wrapped.process(inProt, outProt); }
public Taskpage() throws TException, NotAliveException { FacesContext ctx = FacesContext.getCurrentInstance(); if (ctx.getExternalContext().getRequestParameterMap().get("clusterName") != null) { clusterName = (String) ctx.getExternalContext() .getRequestParameterMap().get("clusterName"); } if (ctx.getExternalContext().getRequestParameterMap().get("topologyid") != null) { topologyid = (String) ctx.getExternalContext() .getRequestParameterMap().get("topologyid"); } if (ctx.getExternalContext().getRequestParameterMap().get("taskid") != null) { taskid = (String) ctx.getExternalContext().getRequestParameterMap() .get("taskid"); } if (topologyid == null) { throw new NotAliveException("Input topologyId is null "); } window = UIUtils.getWindow(ctx); init(); }
@Override public void fail(Object msgId) { DRPCMessageId did = (DRPCMessageId) msgId; DistributedRPCInvocations.Iface client; if (_local_drpc_id == null) { client = _clients.get(did.index); } else { client = (DistributedRPCInvocations.Iface) ServiceRegistry .getService(_local_drpc_id); } try { client.failRequest(did.id); } catch (TException e) { LOG.error("Failed to fail request", e); } }
public static void downloadFromMaster(Map conf, String file, String localFile) throws IOException, TException { WritableByteChannel out = null; NimbusClient client = null; try { client = NimbusClient.getConfiguredClient(conf); String id = client.getClient().beginFileDownload(file); out = Channels.newChannel(new FileOutputStream(localFile)); while (true) { ByteBuffer chunk = client.getClient().downloadChunk(id); int written = out.write(chunk); if (written == 0) break; } } finally { if (out != null) out.close(); if (client != null) client.close(); } }
public void submitTopology() throws NoSuchMethodException, IOException, InstantiationException, TException, IllegalAccessException, InvocationTargetException, ClassNotFoundException { if(topologyBuilder == null) { submitFluxTopology(getTopologyName(), getTopologyLocation(), getTopologyProperties()); } else { submitTopology(getTopologyName(), topologyBuilder, stormConfig); } }
public void submitFluxTopology(String topologyName, File topologyLoc, Properties properties) throws IOException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, TException { TopologyDef topologyDef = loadYaml(topologyName, topologyLoc, properties); Config conf = FluxBuilder.buildConfig(topologyDef); ExecutionContext context = new ExecutionContext(topologyDef, conf); StormTopology topology = FluxBuilder.buildTopology(context); Assert.assertNotNull(topology); topology.validate(); stormCluster.submitTopology(topologyName, conf, topology); }
public static void downloadCodeFromMaster(Map conf, String localRoot, String masterCodeDir, String topologyId, boolean isSupervisor) throws IOException, TException { FileUtils.forceMkdir(new File(localRoot)); FileUtils.forceMkdir(new File(StormConfig.stormlib_path(localRoot))); String localStormjarPath = StormConfig.stormjar_path(localRoot); String masterStormjarPath = StormConfig.stormjar_path(masterCodeDir); Utils.downloadFromMaster(conf, masterStormjarPath, localStormjarPath); String localStormcodePath = StormConfig.stormcode_path(localRoot); String masterStormcodePath = StormConfig.stormcode_path(masterCodeDir); Utils.downloadFromMaster(conf, masterStormcodePath, localStormcodePath); String localStormConfPath = StormConfig.stormconf_path(localRoot); String masterStormConfPath = StormConfig.stormconf_path(masterCodeDir); Utils.downloadFromMaster(conf, masterStormConfPath, localStormConfPath); Map stormConf = (Map) StormConfig.readLocalObject(topologyId, localStormConfPath); if (stormConf == null) throw new IOException("Get topology conf error: " + topologyId); List<String> libs = (List<String>) stormConf .get(GenericOptionsParser.TOPOLOGY_LIB_NAME); if (libs == null) return; for (String libName : libs) { String localStromLibPath = StormConfig.stormlib_path(localRoot, libName); String masterStormLibPath = StormConfig.stormlib_path( masterCodeDir, libName); Utils.downloadFromMaster(conf, masterStormLibPath, localStromLibPath); } }
@Override public void submitTopology(String name, String uploadedJarLocation, String jsonConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException, TopologyAssignException, TException { SubmitOptions options = new SubmitOptions(TopologyInitialStatus.ACTIVE); submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology, options); }
@Override public void beginLibUpload(String libName) throws TException { try { String parent = PathUtils.parent_path(libName); PathUtils.local_mkdirs(parent); data.getUploaders().put(libName, Channels.newChannel(new FileOutputStream(libName))); LOG.info("Begin upload file from client to " + libName); } catch (Exception e) { LOG.error("Fail to upload jar " + libName, e); throw new TException(e); } }
/** * uploading topology jar data */ @Override public void uploadChunk(String location, ByteBuffer chunk) throws TException { TimeCacheMap<Object, Object> uploaders = data.getUploaders(); Object obj = uploaders.get(location); if (obj == null) { throw new TException( "File for that location does not exist (or timed out) " + location); } try { if (obj instanceof WritableByteChannel) { WritableByteChannel channel = (WritableByteChannel) obj; channel.write(chunk); uploaders.put(location, channel); } else { throw new TException("Object isn't WritableByteChannel for " + location); } } catch (IOException e) { String errMsg = " WritableByteChannel write filed when uploadChunk " + location; LOG.error(errMsg); throw new TException(e); } }
@Override public ByteBuffer downloadChunk(String id) throws TException { TimeCacheMap<Object, Object> downloaders = data.getDownloaders(); Object obj = downloaders.get(id); if (obj == null) { throw new TException("Could not find input stream for that id"); } try { if (obj instanceof BufferFileInputStream) { BufferFileInputStream is = (BufferFileInputStream) obj; byte[] ret = is.read(); if (ret != null) { downloaders.put(id, is); return ByteBuffer.wrap(ret); } } else { throw new TException("Object isn't BufferFileInputStream for " + id); } } catch (IOException e) { LOG.error("BufferFileInputStream read failed when downloadChunk ", e); throw new TException(e); } byte[] empty = {}; return ByteBuffer.wrap(empty); }
public DRPCInvocationsClient(String host, int port) { try { this.host = host; this.port = port; connect(); } catch (TException e) { throw new RuntimeException(e); } }
/** * download code ; two cluster mode: local and distributed * * @param conf * @param topologyId * @param masterCodeDir * @param clusterMode * @throws IOException */ private void downloadStormCode(Map conf, String topologyId, String masterCodeDir) throws IOException, TException { String clusterMode = StormConfig.cluster_mode(conf); if (clusterMode.endsWith("distributed")) { downloadDistributeStormCode(conf, topologyId, masterCodeDir); } else if (clusterMode.endsWith("local")) { downloadLocalStormCode(conf, topologyId, masterCodeDir); } }
@Override public void killTopologyWithOpts(String name, KillOptions options) throws NotAliveException { try { state.getNimbus().killTopologyWithOpts(name, options); } catch (TException e) { LOG.error("fail to kill Topology " + name, e); throw new RuntimeException(e); } }
@Override public ClusterSummary getClusterInfo() { try { return state.getNimbus().getClusterInfo(); } catch (TException e) { LOG.error("fail to get cluster info", e); } return null; }
/** * @param args * @throws DRPCExecutionException * @throws TException */ public static void main(String[] args) throws TException, DRPCExecutionException { if (args.length < 1) { throw new IllegalArgumentException("Invalid parameter"); } //"foo.com/blog/1" "engineering.twitter.com/blog/5" DRPCClient client = new DRPCClient(args[0], 4772); String result = client.execute(ReachTopology.TOPOLOGY_NAME, "tech.backtype.com/blog/123"); System.out.println("\n!!! Drpc result:" + result); }
/** * Prepares a topology for aggregation. * * @param topology the topology * @param mapping the mapping * @return the pipeline system part representing the pipeline * @throws NotAliveException in case that the requested topology is not alive * @throws TException in case of problems accessing the remote topology info */ private PipelineSystemPart preparePipelineAggregation(TopologyInfo topology, INameMapping mapping) throws TException, NotAliveException { SystemState state = getState(); String pipelineName = mapping.getPipelineName(); PipelineSystemPart part = state.obtainPipeline(pipelineName); // exists or creates if (null == part.getTopology()) { PipelineInfo info = MonitoringManager.getPipelineInfo(pipelineName); Map<StormTopology, TopologyInfo> topologies = new HashMap<StormTopology, TopologyInfo>(); topologies.put(connection.getTopology(topology), topology); for (PipelineInfo subInfo : info.getSubPipelines()) { String subName = subInfo.getName(); try { TopologyInfo si = connection.getTopologyInfoByName(subName); topologies.put(connection.getTopology(si), si); } catch (NotAliveException e) { LOGGER.info("Sub-topology not alive: " + subName); } } PipelineTopology topo = Utils.buildPipelineTopology(topologies, mapping); part.setTopology(topo); if (null != topo) { LOGGER.info("TOPOLOGY for " + mapping.getPipelineName() + " " + topo); } } if (PipelineLifecycleEvent.Status.INITIALIZED == part.getStatus() && !DataManager.isStarted()) { if (System.currentTimeMillis() - part.getLastStateChange() // TODO WORKAROUND FOR DML > MonitoringConfiguration.getPipelineStartNotificationDelay()) { part.changeStatus(PipelineLifecycleEvent.Status.STARTED, true); } } return part; }
public BoltPage(String clusterName, String topologyId, String componentId, String window) throws TException, NotAliveException { this.clusterName = clusterName; this.topologyid = topologyId; this.componentid = componentId; this.window = window; init(); }
public SpoutPage(String clusterName, String topologyId, String componentId, String window) throws TException, NotAliveException { this.clusterName = clusterName; this.topologyid = topologyId; this.componentid = componentId; this.window = window; init(); }
@Override public TopologyInfo getTopologyInfo(String topologyName) throws TException, NotAliveException { try { return cluster.getTopologyInfo(topologyName); } catch (Throwable e) { // interface does not declare exceptions if (e instanceof NotAliveException || e.getCause() instanceof NotAliveException) { throw e; } else { throw new TException(e.getMessage(), e.getCause()); } } }
/** * Returns the cluster summary. * * @return the cluster summary * @throws TException in case of problems accessing the remote cluster summary */ public ClusterSummary getClusterSummary() throws TException { ClusterSummary result; if (null != client) { result = client.getClusterInfo(); } else if (null != localCluster) { result = localCluster.getClusterInfo(); } else { throw new TException("connection not open"); } return result; }
@Override public DRPCRequest fetchRequest(String func) throws TException { try { if (client == null) connect(); return client.fetchRequest(func); } catch (TException e) { client = null; throw e; } }