Java 类org.apache.curator.framework.recipes.queue.QueueBuilder 实例源码

项目:urmia    文件:JobProcess.java   
public JobProcess(String hostId, CuratorFramework client, String zkPath, String mountPoint, NamingService ns) throws Exception {

        if (StringUtils.isBlank(zkPath))
            throw new IllegalArgumentException("zkPath is blank");

        this.zkPath = zkPath;
        this.client = client;
        this.hostId = hostId;
        this.ns = ns;

        type = isReduce(zkPath) ? REDUCE : MAP;
        String jobId = getId(zkPath);

        if (StringUtils.isBlank(jobId))
            throw new IllegalArgumentException("unable to find jobId from zkPath: " + zkPath);

        String jobDefJson = new String(client.getData().forPath(zkPath)).trim();
        log.info("jobDef at zk path {} => {}, type: {}", zkPath, jobDefJson, type);

        jd = new JobDefinition(jobDefJson);

        je = jobExecFactory.newInstance(hostId, jd, jobId, mountPoint, type);

        QueueBuilder<JobInput> builder = QueueBuilder.builder(client, new InputQueueConsumer(), serializer, zkPath + "/live/in");
        builder.buildQueue().start();
    }
项目:urmia    文件:InputAwareZkJobQueue.java   
private DistributedQueue<JobInput> getQueue(String instanceId) throws Exception {

        DistributedQueue<JobInput> cachedQ = queueMap.get(instanceId);
        if (cachedQ != null)
            return cachedQ;

        String json = jobDef.toString();
        byte[] bytes = json.getBytes();

        String jobs = "/urmia/1/" + instanceId + "/jobs";

        if (client.checkExists().forPath(jobs) == null)
            client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(jobs);


        String p = jobs + "/" + jobId;
        log.info("creating job: {} -> {}", p, json);

        client.create().withMode(CreateMode.PERSISTENT).forPath(p, bytes);

        String ip = p + "/live/in";

        QueueBuilder<JobInput> builder = QueueBuilder.builder(client, null, serializer, ip);

        client.checkExists().usingWatcher(new QueueExistsWatcher()).forPath(p);

        Thread.sleep(500); // todo: why?

        DistributedQueue<JobInput> q = builder.buildQueue();
        q.start();

        queueMap.put(instanceId, q);
        finishedMap.put(instanceId, Boolean.FALSE);
        inputCounter.put(instanceId, new AtomicInteger(0));
        statHandler.incTasks(1);

        return q;
    }
项目:urmia    文件:InputAwareZkJobQueue.java   
private DistributedQueue<JobInput> getReduceQueue(String instanceId) throws Exception {

        String json = jobDef.toString();
        byte[] bytes = json.getBytes();

        String jobs = "/urmia/1/" + instanceId + "/jobs";

        if (client.checkExists().forPath(jobs) == null)
            client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(jobs);


        String p = jobs + "/" + jobId + "-reduce";
        log.info("creating job: {} -> {}", p, json);

        client.create().withMode(CreateMode.PERSISTENT).forPath(p, bytes);

        String ip = p + "/live/in";

        QueueBuilder<JobInput> builder = QueueBuilder.builder(client, null, serializer, ip);

        client.checkExists().usingWatcher(new QueueExistsWatcher()).forPath(p);

        Thread.sleep(500); // todo: why?

        DistributedQueue<JobInput> q = builder.buildQueue();
        q.start();

        return q;
    }
项目:urmia    文件:JobZkPublish.java   
public static void main(String[] args) throws Exception {

        CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_SERVER, new ExponentialBackoffRetry(1000, 3));
        client.start();

        // create my node
        String hostname = InetAddress.getLocalHost().getHostName();

        String jobs = "/urmia/1/" + hostname + "/jobs";

        JobExec lsJob = new JobExec.Shell("ls");
        JobDefinition.Phase lsJobPhase = new JobDefinition.Phase(lsJob);
        JobDefinition jobDef = new JobDefinition("tck", Lists.newArrayList(lsJobPhase));

        byte[] lsJobBytes = jobDef.toString().getBytes();

        if (client.checkExists().forPath(jobs) == null)
            client.create().withMode(CreateMode.PERSISTENT).forPath(jobs);

        String i = new RandomUuidImpl().next();

        log.info("creating job: {}", i);
        String p = jobs + "/" + i;

        client.create().withMode(CreateMode.PERSISTENT).forPath(p, lsJobBytes);

        String ip = p + "/live/in";

        QueueBuilder<JobInput> builder = QueueBuilder.builder(client, null, serializer, ip);

        Thread.sleep(500);

        DistributedQueue<JobInput> q = builder.buildQueue();

        q.start();

        q.put(new LineJobInput("/"));
        q.put(new LineJobInput("/tmp"));
        q.put(END);

        q.flushPuts(1, TimeUnit.SECONDS);

        Thread.sleep(1000);

    }