public JobProcess(String hostId, CuratorFramework client, String zkPath, String mountPoint, NamingService ns) throws Exception { if (StringUtils.isBlank(zkPath)) throw new IllegalArgumentException("zkPath is blank"); this.zkPath = zkPath; this.client = client; this.hostId = hostId; this.ns = ns; type = isReduce(zkPath) ? REDUCE : MAP; String jobId = getId(zkPath); if (StringUtils.isBlank(jobId)) throw new IllegalArgumentException("unable to find jobId from zkPath: " + zkPath); String jobDefJson = new String(client.getData().forPath(zkPath)).trim(); log.info("jobDef at zk path {} => {}, type: {}", zkPath, jobDefJson, type); jd = new JobDefinition(jobDefJson); je = jobExecFactory.newInstance(hostId, jd, jobId, mountPoint, type); QueueBuilder<JobInput> builder = QueueBuilder.builder(client, new InputQueueConsumer(), serializer, zkPath + "/live/in"); builder.buildQueue().start(); }
private DistributedQueue<JobInput> getQueue(String instanceId) throws Exception { DistributedQueue<JobInput> cachedQ = queueMap.get(instanceId); if (cachedQ != null) return cachedQ; String json = jobDef.toString(); byte[] bytes = json.getBytes(); String jobs = "/urmia/1/" + instanceId + "/jobs"; if (client.checkExists().forPath(jobs) == null) client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(jobs); String p = jobs + "/" + jobId; log.info("creating job: {} -> {}", p, json); client.create().withMode(CreateMode.PERSISTENT).forPath(p, bytes); String ip = p + "/live/in"; QueueBuilder<JobInput> builder = QueueBuilder.builder(client, null, serializer, ip); client.checkExists().usingWatcher(new QueueExistsWatcher()).forPath(p); Thread.sleep(500); // todo: why? DistributedQueue<JobInput> q = builder.buildQueue(); q.start(); queueMap.put(instanceId, q); finishedMap.put(instanceId, Boolean.FALSE); inputCounter.put(instanceId, new AtomicInteger(0)); statHandler.incTasks(1); return q; }
private DistributedQueue<JobInput> getReduceQueue(String instanceId) throws Exception { String json = jobDef.toString(); byte[] bytes = json.getBytes(); String jobs = "/urmia/1/" + instanceId + "/jobs"; if (client.checkExists().forPath(jobs) == null) client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(jobs); String p = jobs + "/" + jobId + "-reduce"; log.info("creating job: {} -> {}", p, json); client.create().withMode(CreateMode.PERSISTENT).forPath(p, bytes); String ip = p + "/live/in"; QueueBuilder<JobInput> builder = QueueBuilder.builder(client, null, serializer, ip); client.checkExists().usingWatcher(new QueueExistsWatcher()).forPath(p); Thread.sleep(500); // todo: why? DistributedQueue<JobInput> q = builder.buildQueue(); q.start(); return q; }
public static void main(String[] args) throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_SERVER, new ExponentialBackoffRetry(1000, 3)); client.start(); // create my node String hostname = InetAddress.getLocalHost().getHostName(); String jobs = "/urmia/1/" + hostname + "/jobs"; JobExec lsJob = new JobExec.Shell("ls"); JobDefinition.Phase lsJobPhase = new JobDefinition.Phase(lsJob); JobDefinition jobDef = new JobDefinition("tck", Lists.newArrayList(lsJobPhase)); byte[] lsJobBytes = jobDef.toString().getBytes(); if (client.checkExists().forPath(jobs) == null) client.create().withMode(CreateMode.PERSISTENT).forPath(jobs); String i = new RandomUuidImpl().next(); log.info("creating job: {}", i); String p = jobs + "/" + i; client.create().withMode(CreateMode.PERSISTENT).forPath(p, lsJobBytes); String ip = p + "/live/in"; QueueBuilder<JobInput> builder = QueueBuilder.builder(client, null, serializer, ip); Thread.sleep(500); DistributedQueue<JobInput> q = builder.buildQueue(); q.start(); q.put(new LineJobInput("/")); q.put(new LineJobInput("/tmp")); q.put(END); q.flushPuts(1, TimeUnit.SECONDS); Thread.sleep(1000); }