private ConnectorSplit createSplit(ShardNodes shard) { UUID shardId = shard.getShardUuid(); Collection<String> nodeIds = shard.getNodeIdentifiers(); List<HostAddress> addresses = getAddressesForNodes(nodesById, nodeIds); if (addresses.isEmpty()) { if (!backupAvailable) { throw new PrestoException(RAPTOR_NO_HOST_FOR_SHARD, format("No host for shard %s found: %s", shardId, nodeIds)); } // Pick a random node and optimistically assign the shard to it. // That node will restore the shard from the backup location. Set<Node> availableNodes = nodeSupplier.getWorkerNodes(); if (availableNodes.isEmpty()) { throw new PrestoException(NO_NODES_AVAILABLE, "No nodes available to run query"); } Node node = selectRandom(availableNodes); shardManager.assignShard(tableId, shardId, node.getNodeIdentifier()); addresses = ImmutableList.of(node.getHostAndPort()); } return new RaptorSplit(connectorId, shardId, addresses, effectivePredicate, transactionId); }
private void refreshNodes(String catalogName) { Set<Node> activeNodesWithConnector; do { try { MILLISECONDS.sleep(10); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } activeNodesWithConnector = server.getActiveNodesWithConnector(catalogName); } while (activeNodesWithConnector.isEmpty()); }
@Test public void testPredicatePushdown() throws Exception { for (Node node : nodes) { String nodeIdentifier = node.getNodeIdentifier(); TupleDomain<ColumnHandle> nodeTupleDomain = TupleDomain.fromFixedValues(ImmutableMap.of(columnHandle, NullableValue.of(VARCHAR, utf8Slice(nodeIdentifier)))); ConnectorTableLayoutHandle layout = new JmxTableLayoutHandle(tableHandle, nodeTupleDomain); ConnectorSplitSource splitSource = splitManager.getSplits(JmxTransactionHandle.INSTANCE, SESSION, layout); List<ConnectorSplit> allSplits = getAllSplits(splitSource); assertEquals(allSplits.size(), 1); assertEquals(allSplits.get(0).getAddresses().size(), 1); assertEquals(allSplits.get(0).getAddresses().get(0).getHostText(), nodeIdentifier); } }
@Test public void testNoPredicate() throws Exception { ConnectorTableLayoutHandle layout = new JmxTableLayoutHandle(tableHandle, TupleDomain.all()); ConnectorSplitSource splitSource = splitManager.getSplits(JmxTransactionHandle.INSTANCE, SESSION, layout); List<ConnectorSplit> allSplits = getAllSplits(splitSource); assertEquals(allSplits.size(), nodes.size()); Set<String> actualNodes = nodes.stream().map(Node::getNodeIdentifier).collect(toSet()); Set<String> expectedNodes = new HashSet<>(); for (ConnectorSplit split : allSplits) { List<HostAddress> addresses = split.getAddresses(); assertEquals(addresses.size(), 1); expectedNodes.add(addresses.get(0).getHostText()); } assertEquals(actualNodes, expectedNodes); }
@Override public RemoteTask createRemoteTask(Session session, TaskId taskId, Node node, int partition, PlanFragment fragment, Multimap<PlanNodeId, Split> initialSplits, OutputBuffers outputBuffers, PartitionedSplitCountTracker partitionedSplitCountTracker) { RemoteTask task = remoteTaskFactory.createRemoteTask(session, taskId, node, partition, fragment, initialSplits, outputBuffers, partitionedSplitCountTracker); task.addStateChangeListener(new UpdatePeakMemory(stateMachine)); return task; }
public synchronized Set<RemoteTask> scheduleSplits(Node node, int partition, Iterable<Split> splits) { requireNonNull(node, "node is null"); requireNonNull(splits, "splits is null"); PlanNodeId partitionedSource = stateMachine.getFragment().getPartitionedSource(); checkState(partitionedSource != null, "Partitioned source is null"); ImmutableSet.Builder<RemoteTask> newTasks = ImmutableSet.builder(); Collection<RemoteTask> tasks = this.tasks.get(node); if (tasks == null) { newTasks.add(scheduleTask(node, partition, partitionedSource, splits)); } else { RemoteTask task = tasks.iterator().next(); task.addSplits(partitionedSource, splits); } return newTasks.build(); }
@Test public void testTaskCompletion() throws Exception { MockRemoteTaskFactory remoteTaskFactory = new MockRemoteTaskFactory(remoteTaskExecutor); Node chosenNode = Iterables.get(nodeManager.getActiveDatasourceNodes("foo"), 0); TaskId taskId = new TaskId(new StageId("test", "1"), "1"); RemoteTask remoteTask = remoteTaskFactory.createTableScanTask( taskId, chosenNode, ImmutableList.of(new Split("foo", TestingTransactionHandle.create("foo"), new TestSplitRemote())), nodeTaskMap.createPartitionedSplitCountTracker(chosenNode, taskId)); nodeTaskMap.addTask(chosenNode, remoteTask); assertEquals(nodeTaskMap.getPartitionedSplitsOnNode(chosenNode), 1); remoteTask.abort(); MILLISECONDS.sleep(100); // Sleep until cache expires assertEquals(nodeTaskMap.getPartitionedSplitsOnNode(chosenNode), 0); remoteTask.abort(); assertEquals(nodeTaskMap.getPartitionedSplitsOnNode(chosenNode), 0); }
public static List<Node> selectNodes(int limit, Iterator<Node> candidates, boolean doubleScheduling) { checkArgument(limit > 0, "limit must be at least 1"); List<Node> selected = new ArrayList<>(limit); while (selected.size() < limit && candidates.hasNext()) { selected.add(candidates.next()); } if (doubleScheduling && !selected.isEmpty()) { // Cycle the nodes until we reach the limit int uniqueNodes = selected.size(); int i = 0; while (selected.size() < limit) { if (i >= uniqueNodes) { i = 0; } selected.add(selected.get(i)); i++; } } return selected; }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableLayoutHandle layout) { TpchTableHandle tableHandle = checkType(layout, TpchTableLayoutHandle.class, "layout").getTable(); Set<Node> nodes = nodeManager.getActiveDatasourceNodes(connectorId); checkState(!nodes.isEmpty(), "No TPCH nodes available"); int totalParts = nodes.size() * splitsPerNode; int partNumber = 0; // Split the data using split and skew by the number of nodes available. ImmutableList.Builder<ConnectorSplit> splits = ImmutableList.builder(); for (Node node : nodes) { for (int i = 0; i < splitsPerNode; i++) { splits.add(new TpchSplit(tableHandle, partNumber, totalParts, ImmutableList.of(node.getHostAndPort()))); partNumber++; } } return new FixedSplitSource(connectorId, splits.build()); }
@Nullable private Node bestNodeSplitCount(Iterator<Node> candidates, int minCandidatesWhenFull, int maxSplitsPerNodePerTaskWhenFull, NodeAssignmentStats assignmentStats) { Node bestQueueNotFull = null; int min = Integer.MAX_VALUE; int fullCandidatesConsidered = 0; while (candidates.hasNext() && (fullCandidatesConsidered < minCandidatesWhenFull || bestQueueNotFull == null)) { Node node = candidates.next(); if (assignmentStats.getTotalSplitCount(node) < maxSplitsPerNode) { return node; } fullCandidatesConsidered++; int totalSplitCount = assignmentStats.getTotalQueuedSplitCount(node); if (totalSplitCount < min && totalSplitCount < maxSplitsPerNodePerTaskWhenFull) { bestQueueNotFull = node; } } return bestQueueNotFull; }
private Set<RemoteTask> finalizeTaskCreationIfNecessary() { // only lock down tasks if there is a sub stage that could block waiting for this stage to create all tasks if (stage.getFragment().isLeaf()) { return ImmutableSet.of(); } splitPlacementPolicy.lockDownNodes(); Set<Node> scheduledNodes = stage.getScheduledNodes(); Set<RemoteTask> newTasks = splitPlacementPolicy.allNodes().stream() .filter(node -> !scheduledNodes.contains(node)) .map(node -> stage.scheduleTask(node, BROADCAST_PARTITION_ID)) .collect(toImmutableSet()); // notify listeners that we have scheduled all tasks so they can set no more buffers or exchange splits stage.transitionToSchedulingSplits(); return newTasks; }
public MockRemoteTask createTableScanTask(TaskId taskId, Node newNode, List<Split> splits, PartitionedSplitCountTracker partitionedSplitCountTracker) { Symbol symbol = new Symbol("column"); PlanNodeId sourceId = new PlanNodeId("sourceId"); PlanFragment testFragment = new PlanFragment( new PlanFragmentId("test"), new TableScanNode( sourceId, new TableHandle("test", new TestingTableHandle()), ImmutableList.of(symbol), ImmutableMap.of(symbol, new TestingColumnHandle("column")), Optional.empty(), TupleDomain.all(), null), ImmutableMap.<Symbol, Type>of(symbol, VARCHAR), ImmutableList.of(symbol), SOURCE, sourceId, Optional.empty()); ImmutableMultimap.Builder<PlanNodeId, Split> initialSplits = ImmutableMultimap.builder(); for (Split sourceSplit : splits) { initialSplits.put(sourceId, sourceSplit); } return createRemoteTask(TEST_SESSION, taskId, newNode, 0, testFragment, initialSplits.build(), OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS, partitionedSplitCountTracker); }
@Test public void testSingleNode() throws Exception { FixedCountScheduler nodeScheduler = new FixedCountScheduler( (node, partition) -> taskFactory.createTableScanTask( new TaskId(new StageId("test", "1"), "1"), (Node) node, ImmutableList.of(), new PartitionedSplitCountTracker(delta -> { })), generateRandomNodes(1)); ScheduleResult result = nodeScheduler.schedule(); assertTrue(result.isFinished()); assertTrue(result.getBlocked().isDone()); assertEquals(result.getNewTasks().size(), 1); result.getNewTasks().iterator().next().getNodeId().equals("other 0"); }
@Test public void testMultipleNodes() throws Exception { FixedCountScheduler nodeScheduler = new FixedCountScheduler( (node, partition) -> taskFactory.createTableScanTask( new TaskId(new StageId("test", "1"), "1"), (Node) node, ImmutableList.of(), new PartitionedSplitCountTracker(delta -> { })), generateRandomNodes(5)); ScheduleResult result = nodeScheduler.schedule(); assertTrue(result.isFinished()); assertTrue(result.getBlocked().isDone()); assertEquals(result.getNewTasks().size(), 5); assertEquals(result.getNewTasks().stream().map(RemoteTask::getNodeId).collect(toImmutableSet()).size(), 5); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout) { RestConnectorTableLayoutHandle layoutHandle = Types.checkType(layout, RestConnectorTableLayoutHandle.class, "layout"); List<HostAddress> addresses = nodeManager.getRequiredWorkerNodes().stream() .map(Node::getHostAndPort) .collect(toList()); return new FixedSplitSource(ImmutableList.of( new RestConnectorSplit(layoutHandle.getTableHandle(), addresses))); }
private static List<HostAddress> getAddressesForNodes(Map<String, Node> nodeMap, Iterable<String> nodeIdentifiers) { ImmutableList.Builder<HostAddress> nodes = ImmutableList.builder(); for (String id : nodeIdentifiers) { Node node = nodeMap.get(id); if (node != null) { nodes.add(node.getHostAndPort()); } } return nodes.build(); }
private static NodeManager createNodeManager(String current, String... others) { Node currentNode = new TestingNode(current); ImmutableSet.Builder<Node> nodes = ImmutableSet.builder(); nodes.add(currentNode); for (String other : others) { nodes.add(new TestingNode(other)); } return new TestingNodeManager(nodes.build(), currentNode); }
@Test public void testSplitCount() throws Exception { MockRemoteTaskFactory remoteTaskFactory = new MockRemoteTaskFactory(remoteTaskExecutor); Node chosenNode = Iterables.get(nodeManager.getActiveDatasourceNodes("foo"), 0); TaskId taskId1 = new TaskId(new StageId("test", "1"), "1"); RemoteTask remoteTask1 = remoteTaskFactory.createTableScanTask(taskId1, chosenNode, ImmutableList.of(new Split("foo", TestingTransactionHandle.create("foo"), new TestSplitRemote()), new Split("bar", TestingTransactionHandle.create("bar"), new TestSplitRemote())), nodeTaskMap.createPartitionedSplitCountTracker(chosenNode, taskId1)); TaskId taskId2 = new TaskId(new StageId("test", "1"), "2"); RemoteTask remoteTask2 = remoteTaskFactory.createTableScanTask( taskId2, chosenNode, ImmutableList.of(new Split("foo2", TestingTransactionHandle.create("foo2"), new TestSplitRemote())), nodeTaskMap.createPartitionedSplitCountTracker(chosenNode, taskId2)); nodeTaskMap.addTask(chosenNode, remoteTask1); nodeTaskMap.addTask(chosenNode, remoteTask2); assertEquals(nodeTaskMap.getPartitionedSplitsOnNode(chosenNode), 3); remoteTask1.abort(); assertEquals(nodeTaskMap.getPartitionedSplitsOnNode(chosenNode), 1); remoteTask2.abort(); assertEquals(nodeTaskMap.getPartitionedSplitsOnNode(chosenNode), 0); }
private boolean isConnectionVisibleToAllNodes(String connectorId) { for (TestingPrestoServer server : servers) { server.refreshNodes(); Set<Node> activeNodesWithConnector = server.getActiveNodesWithConnector(connectorId); if (activeNodesWithConnector.size() != servers.size()) { return false; } } return true; }
@Setup public void setup() throws NoSuchMethodException, IllegalAccessException { TestingTransactionHandle transactionHandle = TestingTransactionHandle.create("foo"); finalizerService.start(); NodeTaskMap nodeTaskMap = new NodeTaskMap(finalizerService); ImmutableList.Builder<Node> nodeBuilder = ImmutableList.builder(); for (int i = 0; i < NODES; i++) { nodeBuilder.add(new PrestoNode("node" + i, URI.create("http://" + addressForHost(i).getHostText()), NodeVersion.UNKNOWN)); } List<Node> nodes = nodeBuilder.build(); MockRemoteTaskFactory remoteTaskFactory = new MockRemoteTaskFactory(Executors.newCachedThreadPool(daemonThreadsNamed("remoteTaskExecutor-%s"))); for (int i = 0; i < nodes.size(); i++) { Node node = nodes.get(i); ImmutableList.Builder<Split> initialSplits = ImmutableList.builder(); for (int j = 0; j < MAX_SPLITS_PER_NODE + MAX_PENDING_SPLITS_PER_TASK_PER_NODE; j++) { initialSplits.add(new Split("foo", transactionHandle, new TestSplitRemote(i))); } TaskId taskId = new TaskId(new StageId("test", "1"), String.valueOf(i)); MockRemoteTaskFactory.MockRemoteTask remoteTask = remoteTaskFactory.createTableScanTask(taskId, node, initialSplits.build(), nodeTaskMap.createPartitionedSplitCountTracker(node, taskId)); nodeTaskMap.addTask(node, remoteTask); taskMap.put(node, remoteTask); } for (int i = 0; i < SPLITS; i++) { splits.add(new Split("foo", transactionHandle, new TestSplitRemote(ThreadLocalRandom.current().nextInt(DATA_NODES)))); } InMemoryNodeManager nodeManager = new InMemoryNodeManager(); nodeManager.addNode("foo", nodes); NodeScheduler nodeScheduler = new NodeScheduler(getNetworkTopology(), nodeManager, getNodeSchedulerConfig(), nodeTaskMap); nodeSelector = nodeScheduler.createNodeSelector("foo"); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout) { SystemTableLayoutHandle layoutHandle = checkType(layout, SystemTableLayoutHandle.class, "layout"); SystemTableHandle tableHandle = layoutHandle.getTable(); TupleDomain<ColumnHandle> constraint = layoutHandle.getConstraint(); SystemTable systemTable = tables.get(tableHandle.getSchemaTableName()); Distribution tableDistributionMode = systemTable.getDistribution(); if (tableDistributionMode == SINGLE_COORDINATOR) { HostAddress address = nodeManager.getCurrentNode().getHostAndPort(); ConnectorSplit split = new SystemSplit(tableHandle.getConnectorId(), tableHandle, address, constraint); return new FixedSplitSource(GlobalSystemConnector.NAME, ImmutableList.of(split)); } ImmutableList.Builder<ConnectorSplit> splits = ImmutableList.builder(); ImmutableSet.Builder<Node> nodes = ImmutableSet.builder(); if (tableDistributionMode == ALL_COORDINATORS) { nodes.addAll(nodeManager.getCoordinators()); } else if (tableDistributionMode == ALL_NODES) { nodes.addAll(nodeManager.getNodes(ACTIVE)); } Set<Node> nodeSet = nodes.build(); for (Node node : nodeSet) { splits.add(new SystemSplit(tableHandle.getConnectorId(), tableHandle, node.getHostAndPort(), constraint)); } return new FixedSplitSource(GlobalSystemConnector.NAME, splits.build()); }
private static String getNodeVersion(Node node) { if (node instanceof PrestoNode) { return ((PrestoNode) node).getNodeVersion().toString(); } return ""; }
@Test public void testMultipleTasksPerNode() { NodeSchedulerConfig nodeSchedulerConfig = new NodeSchedulerConfig() .setMaxSplitsPerNode(20) .setIncludeCoordinator(false) .setMaxPendingSplitsPerNodePerTask(10); NodeScheduler nodeScheduler = new NodeScheduler(new LegacyNetworkTopology(), nodeManager, nodeSchedulerConfig, nodeTaskMap); NodeSelector nodeSelector = nodeScheduler.createNodeSelector("foo"); List<Node> nodes = nodeSelector.selectRandomNodes(10); assertEquals(nodes.size(), 3); nodeSchedulerConfig.setMultipleTasksPerNodeEnabled(true); nodeScheduler = new NodeScheduler(new LegacyNetworkTopology(), nodeManager, nodeSchedulerConfig, nodeTaskMap); nodeSelector = nodeScheduler.createNodeSelector("foo"); nodes = nodeSelector.selectRandomNodes(9); assertEquals(nodes.size(), 9); Map<String, Integer> counts = new HashMap<>(); for (Node node : nodes) { Integer value = counts.get(node.getNodeIdentifier()); counts.put(node.getNodeIdentifier(), (value == null ? 0 : value) + 1); } assertEquals(counts.get("other1").intValue(), 3); assertEquals(counts.get("other2").intValue(), 3); assertEquals(counts.get("other3").intValue(), 3); }
private NodeTasks createOrGetNodeTasks(Node node) { NodeTasks nodeTasks = nodeTasksMap.get(node); if (nodeTasks == null) { nodeTasks = addNodeTask(node); } return nodeTasks; }
private NodeTasks addNodeTask(Node node) { NodeTasks newNodeTasks = new NodeTasks(finalizerService); NodeTasks nodeTasks = nodeTasksMap.putIfAbsent(node, newNodeTasks); if (nodeTasks == null) { return newNodeTasks; } return nodeTasks; }
@Test public void testMaxSplitsPerNode() throws Exception { TestingTransactionHandle transactionHandle = TestingTransactionHandle.create("foo"); Node newNode = new PrestoNode("other4", URI.create("http://127.0.0.1:14"), NodeVersion.UNKNOWN); nodeManager.addNode("foo", newNode); ImmutableList.Builder<Split> initialSplits = ImmutableList.builder(); for (int i = 0; i < 10; i++) { initialSplits.add(new Split("foo", transactionHandle, new TestSplitRemote())); } MockRemoteTaskFactory remoteTaskFactory = new MockRemoteTaskFactory(remoteTaskExecutor); // Max out number of splits on node TaskId taskId1 = new TaskId(new StageId("test", "1"), "1"); RemoteTask remoteTask1 = remoteTaskFactory.createTableScanTask(taskId1, newNode, initialSplits.build(), nodeTaskMap.createPartitionedSplitCountTracker(newNode, taskId1)); nodeTaskMap.addTask(newNode, remoteTask1); TaskId taskId2 = new TaskId(new StageId("test", "1"), "2"); RemoteTask remoteTask2 = remoteTaskFactory.createTableScanTask(taskId2, newNode, initialSplits.build(), nodeTaskMap.createPartitionedSplitCountTracker(newNode, taskId2)); nodeTaskMap.addTask(newNode, remoteTask2); Set<Split> splits = new HashSet<>(); for (int i = 0; i < 5; i++) { splits.add(new Split("foo", transactionHandle, new TestSplitRemote())); } Multimap<Node, Split> assignments = nodeSelector.computeAssignments(splits, ImmutableList.copyOf(taskMap.values())); // no split should be assigned to the newNode, as it already has maxNodeSplits assigned to it assertFalse(assignments.keySet().contains(newNode)); remoteTask1.abort(); remoteTask2.abort(); assertEquals(nodeTaskMap.getPartitionedSplitsOnNode(newNode), 0); }
public static ResettableRandomizedIterator<Node> randomizedNodes(NodeMap nodeMap, boolean includeCoordinator) { ImmutableList<Node> nodes = nodeMap.getNodesByHostAndPort().values().stream() .filter(node -> includeCoordinator || !nodeMap.getCoordinatorNodeIds().contains(node.getNodeIdentifier())) .collect(toImmutableList()); return new ResettableRandomizedIterator<>(nodes); }
public NodeAssignmentStats(NodeTaskMap nodeTaskMap, NodeMap nodeMap, List<RemoteTask> existingTasks) { this.nodeTaskMap = requireNonNull(nodeTaskMap, "nodeTaskMap is null"); // pre-populate the assignment counts with zeros. This makes getOrDefault() faster for (Node node : nodeMap.getNodesByHostAndPort().values()) { assignmentCount.put(node, 0); } for (RemoteTask task : existingTasks) { String nodeId = task.getNodeId(); queuedSplitCountByNode.put(nodeId, queuedSplitCountByNode.getOrDefault(nodeId, 0) + task.getQueuedPartitionedSplitCount()); } }
public NodeMap(SetMultimap<HostAddress, Node> nodesByHostAndPort, SetMultimap<InetAddress, Node> nodesByHost, SetMultimap<NetworkLocation, Node> workersByNetworkPath, Set<String> coordinatorNodeIds) { this.nodesByHostAndPort = nodesByHostAndPort; this.nodesByHost = nodesByHost; this.workersByNetworkPath = workersByNetworkPath; this.coordinatorNodeIds = coordinatorNodeIds; }
@Benchmark @OperationsPerInvocation(SPLITS) public Object benchmark(BenchmarkData data) throws Throwable { List<RemoteTask> remoteTasks = ImmutableList.copyOf(data.getTaskMap().values()); Iterator<MockRemoteTaskFactory.MockRemoteTask> finishingTask = Iterators.cycle(data.getTaskMap().values()); Iterator<Split> splits = data.getSplits().iterator(); Set<Split> batch = new HashSet<>(); while (splits.hasNext() || !batch.isEmpty()) { Multimap<Node, Split> assignments = data.getNodeSelector().computeAssignments(batch, remoteTasks); for (Node node : assignments.keySet()) { MockRemoteTaskFactory.MockRemoteTask remoteTask = data.getTaskMap().get(node); remoteTask.addSplits(new PlanNodeId("sourceId"), assignments.get(node)); remoteTask.startSplits(MAX_SPLITS_PER_NODE); } if (assignments.size() == batch.size()) { batch.clear(); } else { batch.removeAll(assignments.values()); } while (batch.size() < SPLIT_BATCH_SIZE && splits.hasNext()) { batch.add(splits.next()); } finishingTask.next().finishSplits((int) Math.ceil(MAX_SPLITS_PER_NODE / 50.0)); } return remoteTasks; }
private Set<RemoteTask> assignSplits(Multimap<Node, Split> splitAssignment) { ImmutableSet.Builder<RemoteTask> newTasks = ImmutableSet.builder(); for (Entry<Node, Collection<Split>> taskSplits : splitAssignment.asMap().entrySet()) { // source partitioned tasks can only receive broadcast data; otherwise it would have a different distribution newTasks.addAll(stage.scheduleSplits(taskSplits.getKey(), BROADCAST_PARTITION_ID, taskSplits.getValue())); } return newTasks.build(); }
@Override public Set<Node> getNodes(NodeState state) { switch (state) { case ACTIVE: return getAllNodes().getActiveNodes(); case INACTIVE: return getAllNodes().getInactiveNodes(); case SHUTTING_DOWN: return getAllNodes().getShuttingDownNodes(); default: throw new IllegalArgumentException("Unknown node state " + state); } }
@BeforeMethod public void setUp() throws Exception { finalizerService = new FinalizerService(); nodeTaskMap = new NodeTaskMap(finalizerService); nodeManager = new InMemoryNodeManager(); ImmutableList.Builder<Node> nodeBuilder = ImmutableList.builder(); nodeBuilder.add(new PrestoNode("other1", URI.create("http://127.0.0.1:11"), NodeVersion.UNKNOWN)); nodeBuilder.add(new PrestoNode("other2", URI.create("http://127.0.0.1:12"), NodeVersion.UNKNOWN)); nodeBuilder.add(new PrestoNode("other3", URI.create("http://127.0.0.1:13"), NodeVersion.UNKNOWN)); ImmutableList<Node> nodes = nodeBuilder.build(); nodeManager.addNode("foo", nodes); NodeSchedulerConfig nodeSchedulerConfig = new NodeSchedulerConfig() .setMaxSplitsPerNode(20) .setIncludeCoordinator(false) .setMaxPendingSplitsPerNodePerTask(10); NodeScheduler nodeScheduler = new NodeScheduler(new LegacyNetworkTopology(), nodeManager, nodeSchedulerConfig, nodeTaskMap); // contents of taskMap indicate the node-task map for the current stage taskMap = new HashMap<>(); nodeSelector = nodeScheduler.createNodeSelector("foo"); remoteTaskExecutor = Executors.newCachedThreadPool(daemonThreadsNamed("remoteTaskExecutor-%s")); finalizerService.start(); }
@Test public void testScheduleLocal() throws Exception { Split split = new Split("foo", TestingTransactionHandle.create("test"), new TestSplitLocal()); Set<Split> splits = ImmutableSet.of(split); Map.Entry<Node, Split> assignment = Iterables.getOnlyElement(nodeSelector.computeAssignments(splits, ImmutableList.copyOf(taskMap.values())).entries()); assertEquals(assignment.getKey().getHostAndPort(), split.getAddresses().get(0)); assertEquals(assignment.getValue(), split); }
@Override public RemoteTask createRemoteTask(Session session, TaskId taskId, Node node, int partition, PlanFragment fragment, Multimap<PlanNodeId, Split> initialSplits, OutputBuffers outputBuffers, PartitionedSplitCountTracker partitionedSplitCountTracker) { return new HttpRemoteTask(session, taskId, node.getNodeIdentifier(), partition, locationFactory.createTaskLocation(node, taskId), fragment, initialSplits, outputBuffers, httpClient, executor, errorScheduledExecutor, minErrorDuration, taskInfoRefreshMaxWait, taskInfoCodec, taskUpdateRequestCodec, partitionedSplitCountTracker ); }
@Override public URI createTaskLocation(Node node, TaskId taskId) { requireNonNull(node, "node is null"); requireNonNull(taskId, "taskId is null"); return uriBuilderFrom(node.getHttpUri()) .appendPath("/v1/task") .appendPath(taskId.toString()) .build(); }
@Override public URI createMemoryInfoLocation(Node node) { requireNonNull(node, "node is null"); return uriBuilderFrom(node.getHttpUri()) .appendPath("/v1/memory").build(); }
@Override public MockRemoteTask createRemoteTask( Session session, TaskId taskId, Node node, int partition, PlanFragment fragment, Multimap<PlanNodeId, Split> initialSplits, OutputBuffers outputBuffers, PartitionedSplitCountTracker partitionedSplitCountTracker) { return new MockRemoteTask(taskId, fragment, node.getNodeIdentifier(), partition, executor, initialSplits, partitionedSplitCountTracker); }
private static Map<Integer, Node> generateRandomNodes(int count) { ImmutableMap.Builder<Integer, Node> nodes = ImmutableMap.builder(); for (int i = 0; i < count; i++) { nodes.put(i, new PrestoNode("other " + i, URI.create("http://127.0.0.1:11"), NodeVersion.UNKNOWN)); } return nodes.build(); }