Java 类com.facebook.presto.spi.Node 实例源码
项目:presto
文件:RaptorSplitManager.java
private ConnectorSplit createSplit(ShardNodes shard)
{
UUID shardId = shard.getShardUuid();
Collection<String> nodeIds = shard.getNodeIdentifiers();
List<HostAddress> addresses = getAddressesForNodes(nodesById, nodeIds);
if (addresses.isEmpty()) {
if (!backupAvailable) {
throw new PrestoException(RAPTOR_NO_HOST_FOR_SHARD, format("No host for shard %s found: %s", shardId, nodeIds));
}
// Pick a random node and optimistically assign the shard to it.
// That node will restore the shard from the backup location.
Set<Node> availableNodes = nodeSupplier.getWorkerNodes();
if (availableNodes.isEmpty()) {
throw new PrestoException(NO_NODES_AVAILABLE, "No nodes available to run query");
}
Node node = selectRandom(availableNodes);
shardManager.assignShard(tableId, shardId, node.getNodeIdentifier());
addresses = ImmutableList.of(node.getHostAndPort());
}
return new RaptorSplit(connectorId, shardId, addresses, effectivePredicate, transactionId);
}
项目:presto
文件:StandaloneQueryRunner.java
private void refreshNodes(String catalogName)
{
Set<Node> activeNodesWithConnector;
do {
try {
MILLISECONDS.sleep(10);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
activeNodesWithConnector = server.getActiveNodesWithConnector(catalogName);
}
while (activeNodesWithConnector.isEmpty());
}
项目:presto
文件:TestJmxSplitManager.java
@Test
public void testPredicatePushdown()
throws Exception
{
for (Node node : nodes) {
String nodeIdentifier = node.getNodeIdentifier();
TupleDomain<ColumnHandle> nodeTupleDomain = TupleDomain.fromFixedValues(ImmutableMap.of(columnHandle, NullableValue.of(VARCHAR, utf8Slice(nodeIdentifier))));
ConnectorTableLayoutHandle layout = new JmxTableLayoutHandle(tableHandle, nodeTupleDomain);
ConnectorSplitSource splitSource = splitManager.getSplits(JmxTransactionHandle.INSTANCE, SESSION, layout);
List<ConnectorSplit> allSplits = getAllSplits(splitSource);
assertEquals(allSplits.size(), 1);
assertEquals(allSplits.get(0).getAddresses().size(), 1);
assertEquals(allSplits.get(0).getAddresses().get(0).getHostText(), nodeIdentifier);
}
}
项目:presto
文件:TestJmxSplitManager.java
@Test
public void testNoPredicate()
throws Exception
{
ConnectorTableLayoutHandle layout = new JmxTableLayoutHandle(tableHandle, TupleDomain.all());
ConnectorSplitSource splitSource = splitManager.getSplits(JmxTransactionHandle.INSTANCE, SESSION, layout);
List<ConnectorSplit> allSplits = getAllSplits(splitSource);
assertEquals(allSplits.size(), nodes.size());
Set<String> actualNodes = nodes.stream().map(Node::getNodeIdentifier).collect(toSet());
Set<String> expectedNodes = new HashSet<>();
for (ConnectorSplit split : allSplits) {
List<HostAddress> addresses = split.getAddresses();
assertEquals(addresses.size(), 1);
expectedNodes.add(addresses.get(0).getHostText());
}
assertEquals(actualNodes, expectedNodes);
}
项目:presto
文件:MemoryTrackingRemoteTaskFactory.java
@Override
public RemoteTask createRemoteTask(Session session,
TaskId taskId,
Node node,
int partition,
PlanFragment fragment,
Multimap<PlanNodeId, Split> initialSplits,
OutputBuffers outputBuffers,
PartitionedSplitCountTracker partitionedSplitCountTracker)
{
RemoteTask task = remoteTaskFactory.createRemoteTask(session,
taskId,
node,
partition,
fragment,
initialSplits,
outputBuffers,
partitionedSplitCountTracker);
task.addStateChangeListener(new UpdatePeakMemory(stateMachine));
return task;
}
项目:presto
文件:SqlStageExecution.java
public synchronized Set<RemoteTask> scheduleSplits(Node node, int partition, Iterable<Split> splits)
{
requireNonNull(node, "node is null");
requireNonNull(splits, "splits is null");
PlanNodeId partitionedSource = stateMachine.getFragment().getPartitionedSource();
checkState(partitionedSource != null, "Partitioned source is null");
ImmutableSet.Builder<RemoteTask> newTasks = ImmutableSet.builder();
Collection<RemoteTask> tasks = this.tasks.get(node);
if (tasks == null) {
newTasks.add(scheduleTask(node, partition, partitionedSource, splits));
}
else {
RemoteTask task = tasks.iterator().next();
task.addSplits(partitionedSource, splits);
}
return newTasks.build();
}
项目:presto
文件:TestNodeScheduler.java
@Test
public void testTaskCompletion()
throws Exception
{
MockRemoteTaskFactory remoteTaskFactory = new MockRemoteTaskFactory(remoteTaskExecutor);
Node chosenNode = Iterables.get(nodeManager.getActiveDatasourceNodes("foo"), 0);
TaskId taskId = new TaskId(new StageId("test", "1"), "1");
RemoteTask remoteTask = remoteTaskFactory.createTableScanTask(
taskId,
chosenNode,
ImmutableList.of(new Split("foo", TestingTransactionHandle.create("foo"), new TestSplitRemote())),
nodeTaskMap.createPartitionedSplitCountTracker(chosenNode, taskId));
nodeTaskMap.addTask(chosenNode, remoteTask);
assertEquals(nodeTaskMap.getPartitionedSplitsOnNode(chosenNode), 1);
remoteTask.abort();
MILLISECONDS.sleep(100); // Sleep until cache expires
assertEquals(nodeTaskMap.getPartitionedSplitsOnNode(chosenNode), 0);
remoteTask.abort();
assertEquals(nodeTaskMap.getPartitionedSplitsOnNode(chosenNode), 0);
}
项目:presto
文件:NodeScheduler.java
public static List<Node> selectNodes(int limit, Iterator<Node> candidates, boolean doubleScheduling)
{
checkArgument(limit > 0, "limit must be at least 1");
List<Node> selected = new ArrayList<>(limit);
while (selected.size() < limit && candidates.hasNext()) {
selected.add(candidates.next());
}
if (doubleScheduling && !selected.isEmpty()) {
// Cycle the nodes until we reach the limit
int uniqueNodes = selected.size();
int i = 0;
while (selected.size() < limit) {
if (i >= uniqueNodes) {
i = 0;
}
selected.add(selected.get(i));
i++;
}
}
return selected;
}
项目:presto
文件:TpchSplitManager.java
@Override
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableLayoutHandle layout)
{
TpchTableHandle tableHandle = checkType(layout, TpchTableLayoutHandle.class, "layout").getTable();
Set<Node> nodes = nodeManager.getActiveDatasourceNodes(connectorId);
checkState(!nodes.isEmpty(), "No TPCH nodes available");
int totalParts = nodes.size() * splitsPerNode;
int partNumber = 0;
// Split the data using split and skew by the number of nodes available.
ImmutableList.Builder<ConnectorSplit> splits = ImmutableList.builder();
for (Node node : nodes) {
for (int i = 0; i < splitsPerNode; i++) {
splits.add(new TpchSplit(tableHandle, partNumber, totalParts, ImmutableList.of(node.getHostAndPort())));
partNumber++;
}
}
return new FixedSplitSource(connectorId, splits.build());
}
项目:presto
文件:TopologyAwareNodeSelector.java
@Nullable
private Node bestNodeSplitCount(Iterator<Node> candidates, int minCandidatesWhenFull, int maxSplitsPerNodePerTaskWhenFull, NodeAssignmentStats assignmentStats)
{
Node bestQueueNotFull = null;
int min = Integer.MAX_VALUE;
int fullCandidatesConsidered = 0;
while (candidates.hasNext() && (fullCandidatesConsidered < minCandidatesWhenFull || bestQueueNotFull == null)) {
Node node = candidates.next();
if (assignmentStats.getTotalSplitCount(node) < maxSplitsPerNode) {
return node;
}
fullCandidatesConsidered++;
int totalSplitCount = assignmentStats.getTotalQueuedSplitCount(node);
if (totalSplitCount < min && totalSplitCount < maxSplitsPerNodePerTaskWhenFull) {
bestQueueNotFull = node;
}
}
return bestQueueNotFull;
}
项目:presto
文件:SourcePartitionedScheduler.java
private Set<RemoteTask> finalizeTaskCreationIfNecessary()
{
// only lock down tasks if there is a sub stage that could block waiting for this stage to create all tasks
if (stage.getFragment().isLeaf()) {
return ImmutableSet.of();
}
splitPlacementPolicy.lockDownNodes();
Set<Node> scheduledNodes = stage.getScheduledNodes();
Set<RemoteTask> newTasks = splitPlacementPolicy.allNodes().stream()
.filter(node -> !scheduledNodes.contains(node))
.map(node -> stage.scheduleTask(node, BROADCAST_PARTITION_ID))
.collect(toImmutableSet());
// notify listeners that we have scheduled all tasks so they can set no more buffers or exchange splits
stage.transitionToSchedulingSplits();
return newTasks;
}
项目:presto
文件:MockRemoteTaskFactory.java
public MockRemoteTask createTableScanTask(TaskId taskId, Node newNode, List<Split> splits, PartitionedSplitCountTracker partitionedSplitCountTracker)
{
Symbol symbol = new Symbol("column");
PlanNodeId sourceId = new PlanNodeId("sourceId");
PlanFragment testFragment = new PlanFragment(
new PlanFragmentId("test"),
new TableScanNode(
sourceId,
new TableHandle("test", new TestingTableHandle()),
ImmutableList.of(symbol),
ImmutableMap.of(symbol, new TestingColumnHandle("column")),
Optional.empty(),
TupleDomain.all(),
null),
ImmutableMap.<Symbol, Type>of(symbol, VARCHAR),
ImmutableList.of(symbol),
SOURCE,
sourceId,
Optional.empty());
ImmutableMultimap.Builder<PlanNodeId, Split> initialSplits = ImmutableMultimap.builder();
for (Split sourceSplit : splits) {
initialSplits.put(sourceId, sourceSplit);
}
return createRemoteTask(TEST_SESSION, taskId, newNode, 0, testFragment, initialSplits.build(), OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS, partitionedSplitCountTracker);
}
项目:presto
文件:TestFixedCountScheduler.java
@Test
public void testSingleNode()
throws Exception
{
FixedCountScheduler nodeScheduler = new FixedCountScheduler(
(node, partition) -> taskFactory.createTableScanTask(
new TaskId(new StageId("test", "1"), "1"),
(Node) node, ImmutableList.of(),
new PartitionedSplitCountTracker(delta -> { })),
generateRandomNodes(1));
ScheduleResult result = nodeScheduler.schedule();
assertTrue(result.isFinished());
assertTrue(result.getBlocked().isDone());
assertEquals(result.getNewTasks().size(), 1);
result.getNewTasks().iterator().next().getNodeId().equals("other 0");
}
项目:presto
文件:TestFixedCountScheduler.java
@Test
public void testMultipleNodes()
throws Exception
{
FixedCountScheduler nodeScheduler = new FixedCountScheduler(
(node, partition) -> taskFactory.createTableScanTask(
new TaskId(new StageId("test", "1"), "1"),
(Node) node, ImmutableList.of(),
new PartitionedSplitCountTracker(delta -> { })),
generateRandomNodes(5));
ScheduleResult result = nodeScheduler.schedule();
assertTrue(result.isFinished());
assertTrue(result.getBlocked().isDone());
assertEquals(result.getNewTasks().size(), 5);
assertEquals(result.getNewTasks().stream().map(RemoteTask::getNodeId).collect(toImmutableSet()).size(), 5);
}
项目:presto-rest
文件:RestSplitManager.java
@Override
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout)
{
RestConnectorTableLayoutHandle layoutHandle = Types.checkType(layout, RestConnectorTableLayoutHandle.class, "layout");
List<HostAddress> addresses = nodeManager.getRequiredWorkerNodes().stream()
.map(Node::getHostAndPort)
.collect(toList());
return new FixedSplitSource(ImmutableList.of(
new RestConnectorSplit(layoutHandle.getTableHandle(), addresses)));
}
项目:presto
文件:RaptorSplitManager.java
private static List<HostAddress> getAddressesForNodes(Map<String, Node> nodeMap, Iterable<String> nodeIdentifiers)
{
ImmutableList.Builder<HostAddress> nodes = ImmutableList.builder();
for (String id : nodeIdentifiers) {
Node node = nodeMap.get(id);
if (node != null) {
nodes.add(node.getHostAndPort());
}
}
return nodes.build();
}
项目:presto
文件:TestShardEjector.java
private static NodeManager createNodeManager(String current, String... others)
{
Node currentNode = new TestingNode(current);
ImmutableSet.Builder<Node> nodes = ImmutableSet.builder();
nodes.add(currentNode);
for (String other : others) {
nodes.add(new TestingNode(other));
}
return new TestingNodeManager(nodes.build(), currentNode);
}
项目:presto
文件:TestNodeScheduler.java
@Test
public void testSplitCount()
throws Exception
{
MockRemoteTaskFactory remoteTaskFactory = new MockRemoteTaskFactory(remoteTaskExecutor);
Node chosenNode = Iterables.get(nodeManager.getActiveDatasourceNodes("foo"), 0);
TaskId taskId1 = new TaskId(new StageId("test", "1"), "1");
RemoteTask remoteTask1 = remoteTaskFactory.createTableScanTask(taskId1,
chosenNode,
ImmutableList.of(new Split("foo", TestingTransactionHandle.create("foo"), new TestSplitRemote()), new Split("bar", TestingTransactionHandle.create("bar"), new TestSplitRemote())),
nodeTaskMap.createPartitionedSplitCountTracker(chosenNode, taskId1));
TaskId taskId2 = new TaskId(new StageId("test", "1"), "2");
RemoteTask remoteTask2 = remoteTaskFactory.createTableScanTask(
taskId2,
chosenNode,
ImmutableList.of(new Split("foo2", TestingTransactionHandle.create("foo2"), new TestSplitRemote())),
nodeTaskMap.createPartitionedSplitCountTracker(chosenNode, taskId2));
nodeTaskMap.addTask(chosenNode, remoteTask1);
nodeTaskMap.addTask(chosenNode, remoteTask2);
assertEquals(nodeTaskMap.getPartitionedSplitsOnNode(chosenNode), 3);
remoteTask1.abort();
assertEquals(nodeTaskMap.getPartitionedSplitsOnNode(chosenNode), 1);
remoteTask2.abort();
assertEquals(nodeTaskMap.getPartitionedSplitsOnNode(chosenNode), 0);
}
项目:presto
文件:DistributedQueryRunner.java
private boolean isConnectionVisibleToAllNodes(String connectorId)
{
for (TestingPrestoServer server : servers) {
server.refreshNodes();
Set<Node> activeNodesWithConnector = server.getActiveNodesWithConnector(connectorId);
if (activeNodesWithConnector.size() != servers.size()) {
return false;
}
}
return true;
}
项目:presto
文件:BenchmarkNodeScheduler.java
@Setup
public void setup()
throws NoSuchMethodException, IllegalAccessException
{
TestingTransactionHandle transactionHandle = TestingTransactionHandle.create("foo");
finalizerService.start();
NodeTaskMap nodeTaskMap = new NodeTaskMap(finalizerService);
ImmutableList.Builder<Node> nodeBuilder = ImmutableList.builder();
for (int i = 0; i < NODES; i++) {
nodeBuilder.add(new PrestoNode("node" + i, URI.create("http://" + addressForHost(i).getHostText()), NodeVersion.UNKNOWN));
}
List<Node> nodes = nodeBuilder.build();
MockRemoteTaskFactory remoteTaskFactory = new MockRemoteTaskFactory(Executors.newCachedThreadPool(daemonThreadsNamed("remoteTaskExecutor-%s")));
for (int i = 0; i < nodes.size(); i++) {
Node node = nodes.get(i);
ImmutableList.Builder<Split> initialSplits = ImmutableList.builder();
for (int j = 0; j < MAX_SPLITS_PER_NODE + MAX_PENDING_SPLITS_PER_TASK_PER_NODE; j++) {
initialSplits.add(new Split("foo", transactionHandle, new TestSplitRemote(i)));
}
TaskId taskId = new TaskId(new StageId("test", "1"), String.valueOf(i));
MockRemoteTaskFactory.MockRemoteTask remoteTask = remoteTaskFactory.createTableScanTask(taskId, node, initialSplits.build(), nodeTaskMap.createPartitionedSplitCountTracker(node, taskId));
nodeTaskMap.addTask(node, remoteTask);
taskMap.put(node, remoteTask);
}
for (int i = 0; i < SPLITS; i++) {
splits.add(new Split("foo", transactionHandle, new TestSplitRemote(ThreadLocalRandom.current().nextInt(DATA_NODES))));
}
InMemoryNodeManager nodeManager = new InMemoryNodeManager();
nodeManager.addNode("foo", nodes);
NodeScheduler nodeScheduler = new NodeScheduler(getNetworkTopology(), nodeManager, getNodeSchedulerConfig(), nodeTaskMap);
nodeSelector = nodeScheduler.createNodeSelector("foo");
}
项目:presto
文件:SystemSplitManager.java
@Override
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout)
{
SystemTableLayoutHandle layoutHandle = checkType(layout, SystemTableLayoutHandle.class, "layout");
SystemTableHandle tableHandle = layoutHandle.getTable();
TupleDomain<ColumnHandle> constraint = layoutHandle.getConstraint();
SystemTable systemTable = tables.get(tableHandle.getSchemaTableName());
Distribution tableDistributionMode = systemTable.getDistribution();
if (tableDistributionMode == SINGLE_COORDINATOR) {
HostAddress address = nodeManager.getCurrentNode().getHostAndPort();
ConnectorSplit split = new SystemSplit(tableHandle.getConnectorId(), tableHandle, address, constraint);
return new FixedSplitSource(GlobalSystemConnector.NAME, ImmutableList.of(split));
}
ImmutableList.Builder<ConnectorSplit> splits = ImmutableList.builder();
ImmutableSet.Builder<Node> nodes = ImmutableSet.builder();
if (tableDistributionMode == ALL_COORDINATORS) {
nodes.addAll(nodeManager.getCoordinators());
}
else if (tableDistributionMode == ALL_NODES) {
nodes.addAll(nodeManager.getNodes(ACTIVE));
}
Set<Node> nodeSet = nodes.build();
for (Node node : nodeSet) {
splits.add(new SystemSplit(tableHandle.getConnectorId(), tableHandle, node.getHostAndPort(), constraint));
}
return new FixedSplitSource(GlobalSystemConnector.NAME, splits.build());
}
项目:presto
文件:NodeSystemTable.java
private static String getNodeVersion(Node node)
{
if (node instanceof PrestoNode) {
return ((PrestoNode) node).getNodeVersion().toString();
}
return "";
}
项目:presto
文件:TestNodeScheduler.java
@Test
public void testMultipleTasksPerNode()
{
NodeSchedulerConfig nodeSchedulerConfig = new NodeSchedulerConfig()
.setMaxSplitsPerNode(20)
.setIncludeCoordinator(false)
.setMaxPendingSplitsPerNodePerTask(10);
NodeScheduler nodeScheduler = new NodeScheduler(new LegacyNetworkTopology(), nodeManager, nodeSchedulerConfig, nodeTaskMap);
NodeSelector nodeSelector = nodeScheduler.createNodeSelector("foo");
List<Node> nodes = nodeSelector.selectRandomNodes(10);
assertEquals(nodes.size(), 3);
nodeSchedulerConfig.setMultipleTasksPerNodeEnabled(true);
nodeScheduler = new NodeScheduler(new LegacyNetworkTopology(), nodeManager, nodeSchedulerConfig, nodeTaskMap);
nodeSelector = nodeScheduler.createNodeSelector("foo");
nodes = nodeSelector.selectRandomNodes(9);
assertEquals(nodes.size(), 9);
Map<String, Integer> counts = new HashMap<>();
for (Node node : nodes) {
Integer value = counts.get(node.getNodeIdentifier());
counts.put(node.getNodeIdentifier(), (value == null ? 0 : value) + 1);
}
assertEquals(counts.get("other1").intValue(), 3);
assertEquals(counts.get("other2").intValue(), 3);
assertEquals(counts.get("other3").intValue(), 3);
}
项目:presto
文件:NodeTaskMap.java
private NodeTasks createOrGetNodeTasks(Node node)
{
NodeTasks nodeTasks = nodeTasksMap.get(node);
if (nodeTasks == null) {
nodeTasks = addNodeTask(node);
}
return nodeTasks;
}
项目:presto
文件:NodeTaskMap.java
private NodeTasks addNodeTask(Node node)
{
NodeTasks newNodeTasks = new NodeTasks(finalizerService);
NodeTasks nodeTasks = nodeTasksMap.putIfAbsent(node, newNodeTasks);
if (nodeTasks == null) {
return newNodeTasks;
}
return nodeTasks;
}
项目:presto
文件:TestNodeScheduler.java
@Test
public void testMaxSplitsPerNode()
throws Exception
{
TestingTransactionHandle transactionHandle = TestingTransactionHandle.create("foo");
Node newNode = new PrestoNode("other4", URI.create("http://127.0.0.1:14"), NodeVersion.UNKNOWN);
nodeManager.addNode("foo", newNode);
ImmutableList.Builder<Split> initialSplits = ImmutableList.builder();
for (int i = 0; i < 10; i++) {
initialSplits.add(new Split("foo", transactionHandle, new TestSplitRemote()));
}
MockRemoteTaskFactory remoteTaskFactory = new MockRemoteTaskFactory(remoteTaskExecutor);
// Max out number of splits on node
TaskId taskId1 = new TaskId(new StageId("test", "1"), "1");
RemoteTask remoteTask1 = remoteTaskFactory.createTableScanTask(taskId1, newNode, initialSplits.build(), nodeTaskMap.createPartitionedSplitCountTracker(newNode, taskId1));
nodeTaskMap.addTask(newNode, remoteTask1);
TaskId taskId2 = new TaskId(new StageId("test", "1"), "2");
RemoteTask remoteTask2 = remoteTaskFactory.createTableScanTask(taskId2, newNode, initialSplits.build(), nodeTaskMap.createPartitionedSplitCountTracker(newNode, taskId2));
nodeTaskMap.addTask(newNode, remoteTask2);
Set<Split> splits = new HashSet<>();
for (int i = 0; i < 5; i++) {
splits.add(new Split("foo", transactionHandle, new TestSplitRemote()));
}
Multimap<Node, Split> assignments = nodeSelector.computeAssignments(splits, ImmutableList.copyOf(taskMap.values()));
// no split should be assigned to the newNode, as it already has maxNodeSplits assigned to it
assertFalse(assignments.keySet().contains(newNode));
remoteTask1.abort();
remoteTask2.abort();
assertEquals(nodeTaskMap.getPartitionedSplitsOnNode(newNode), 0);
}
项目:presto
文件:NodeScheduler.java
public static ResettableRandomizedIterator<Node> randomizedNodes(NodeMap nodeMap, boolean includeCoordinator)
{
ImmutableList<Node> nodes = nodeMap.getNodesByHostAndPort().values().stream()
.filter(node -> includeCoordinator || !nodeMap.getCoordinatorNodeIds().contains(node.getNodeIdentifier()))
.collect(toImmutableList());
return new ResettableRandomizedIterator<>(nodes);
}
项目:presto
文件:NodeAssignmentStats.java
public NodeAssignmentStats(NodeTaskMap nodeTaskMap, NodeMap nodeMap, List<RemoteTask> existingTasks)
{
this.nodeTaskMap = requireNonNull(nodeTaskMap, "nodeTaskMap is null");
// pre-populate the assignment counts with zeros. This makes getOrDefault() faster
for (Node node : nodeMap.getNodesByHostAndPort().values()) {
assignmentCount.put(node, 0);
}
for (RemoteTask task : existingTasks) {
String nodeId = task.getNodeId();
queuedSplitCountByNode.put(nodeId, queuedSplitCountByNode.getOrDefault(nodeId, 0) + task.getQueuedPartitionedSplitCount());
}
}
项目:presto
文件:NodeMap.java
public NodeMap(SetMultimap<HostAddress, Node> nodesByHostAndPort,
SetMultimap<InetAddress, Node> nodesByHost,
SetMultimap<NetworkLocation, Node> workersByNetworkPath,
Set<String> coordinatorNodeIds)
{
this.nodesByHostAndPort = nodesByHostAndPort;
this.nodesByHost = nodesByHost;
this.workersByNetworkPath = workersByNetworkPath;
this.coordinatorNodeIds = coordinatorNodeIds;
}
项目:presto
文件:BenchmarkNodeScheduler.java
@Benchmark
@OperationsPerInvocation(SPLITS)
public Object benchmark(BenchmarkData data)
throws Throwable
{
List<RemoteTask> remoteTasks = ImmutableList.copyOf(data.getTaskMap().values());
Iterator<MockRemoteTaskFactory.MockRemoteTask> finishingTask = Iterators.cycle(data.getTaskMap().values());
Iterator<Split> splits = data.getSplits().iterator();
Set<Split> batch = new HashSet<>();
while (splits.hasNext() || !batch.isEmpty()) {
Multimap<Node, Split> assignments = data.getNodeSelector().computeAssignments(batch, remoteTasks);
for (Node node : assignments.keySet()) {
MockRemoteTaskFactory.MockRemoteTask remoteTask = data.getTaskMap().get(node);
remoteTask.addSplits(new PlanNodeId("sourceId"), assignments.get(node));
remoteTask.startSplits(MAX_SPLITS_PER_NODE);
}
if (assignments.size() == batch.size()) {
batch.clear();
}
else {
batch.removeAll(assignments.values());
}
while (batch.size() < SPLIT_BATCH_SIZE && splits.hasNext()) {
batch.add(splits.next());
}
finishingTask.next().finishSplits((int) Math.ceil(MAX_SPLITS_PER_NODE / 50.0));
}
return remoteTasks;
}
项目:presto
文件:SourcePartitionedScheduler.java
private Set<RemoteTask> assignSplits(Multimap<Node, Split> splitAssignment)
{
ImmutableSet.Builder<RemoteTask> newTasks = ImmutableSet.builder();
for (Entry<Node, Collection<Split>> taskSplits : splitAssignment.asMap().entrySet()) {
// source partitioned tasks can only receive broadcast data; otherwise it would have a different distribution
newTasks.addAll(stage.scheduleSplits(taskSplits.getKey(), BROADCAST_PARTITION_ID, taskSplits.getValue()));
}
return newTasks.build();
}
项目:presto
文件:InMemoryNodeManager.java
@Override
public Set<Node> getNodes(NodeState state)
{
switch (state) {
case ACTIVE:
return getAllNodes().getActiveNodes();
case INACTIVE:
return getAllNodes().getInactiveNodes();
case SHUTTING_DOWN:
return getAllNodes().getShuttingDownNodes();
default:
throw new IllegalArgumentException("Unknown node state " + state);
}
}
项目:presto
文件:TestNodeScheduler.java
@BeforeMethod
public void setUp()
throws Exception
{
finalizerService = new FinalizerService();
nodeTaskMap = new NodeTaskMap(finalizerService);
nodeManager = new InMemoryNodeManager();
ImmutableList.Builder<Node> nodeBuilder = ImmutableList.builder();
nodeBuilder.add(new PrestoNode("other1", URI.create("http://127.0.0.1:11"), NodeVersion.UNKNOWN));
nodeBuilder.add(new PrestoNode("other2", URI.create("http://127.0.0.1:12"), NodeVersion.UNKNOWN));
nodeBuilder.add(new PrestoNode("other3", URI.create("http://127.0.0.1:13"), NodeVersion.UNKNOWN));
ImmutableList<Node> nodes = nodeBuilder.build();
nodeManager.addNode("foo", nodes);
NodeSchedulerConfig nodeSchedulerConfig = new NodeSchedulerConfig()
.setMaxSplitsPerNode(20)
.setIncludeCoordinator(false)
.setMaxPendingSplitsPerNodePerTask(10);
NodeScheduler nodeScheduler = new NodeScheduler(new LegacyNetworkTopology(), nodeManager, nodeSchedulerConfig, nodeTaskMap);
// contents of taskMap indicate the node-task map for the current stage
taskMap = new HashMap<>();
nodeSelector = nodeScheduler.createNodeSelector("foo");
remoteTaskExecutor = Executors.newCachedThreadPool(daemonThreadsNamed("remoteTaskExecutor-%s"));
finalizerService.start();
}
项目:presto
文件:TestNodeScheduler.java
@Test
public void testScheduleLocal()
throws Exception
{
Split split = new Split("foo", TestingTransactionHandle.create("test"), new TestSplitLocal());
Set<Split> splits = ImmutableSet.of(split);
Map.Entry<Node, Split> assignment = Iterables.getOnlyElement(nodeSelector.computeAssignments(splits, ImmutableList.copyOf(taskMap.values())).entries());
assertEquals(assignment.getKey().getHostAndPort(), split.getAddresses().get(0));
assertEquals(assignment.getValue(), split);
}
项目:presto
文件:DiscoveryNodeManager.java
@Override
public Set<Node> getNodes(NodeState state)
{
switch (state) {
case ACTIVE:
return getAllNodes().getActiveNodes();
case INACTIVE:
return getAllNodes().getInactiveNodes();
case SHUTTING_DOWN:
return getAllNodes().getShuttingDownNodes();
default:
throw new IllegalArgumentException("Unknown node state " + state);
}
}
项目:presto
文件:HttpRemoteTaskFactory.java
@Override
public RemoteTask createRemoteTask(Session session,
TaskId taskId,
Node node,
int partition,
PlanFragment fragment,
Multimap<PlanNodeId, Split> initialSplits,
OutputBuffers outputBuffers,
PartitionedSplitCountTracker partitionedSplitCountTracker)
{
return new HttpRemoteTask(session,
taskId,
node.getNodeIdentifier(),
partition,
locationFactory.createTaskLocation(node, taskId),
fragment,
initialSplits,
outputBuffers,
httpClient,
executor,
errorScheduledExecutor,
minErrorDuration,
taskInfoRefreshMaxWait,
taskInfoCodec,
taskUpdateRequestCodec,
partitionedSplitCountTracker
);
}
项目:presto
文件:HttpLocationFactory.java
@Override
public URI createTaskLocation(Node node, TaskId taskId)
{
requireNonNull(node, "node is null");
requireNonNull(taskId, "taskId is null");
return uriBuilderFrom(node.getHttpUri())
.appendPath("/v1/task")
.appendPath(taskId.toString())
.build();
}
项目:presto
文件:HttpLocationFactory.java
@Override
public URI createMemoryInfoLocation(Node node)
{
requireNonNull(node, "node is null");
return uriBuilderFrom(node.getHttpUri())
.appendPath("/v1/memory").build();
}
项目:presto
文件:MockRemoteTaskFactory.java
@Override
public MockRemoteTask createRemoteTask(
Session session,
TaskId taskId,
Node node,
int partition,
PlanFragment fragment,
Multimap<PlanNodeId, Split> initialSplits,
OutputBuffers outputBuffers,
PartitionedSplitCountTracker partitionedSplitCountTracker)
{
return new MockRemoteTask(taskId, fragment, node.getNodeIdentifier(), partition, executor, initialSplits, partitionedSplitCountTracker);
}
项目:presto
文件:TestFixedCountScheduler.java
private static Map<Integer, Node> generateRandomNodes(int count)
{
ImmutableMap.Builder<Integer, Node> nodes = ImmutableMap.builder();
for (int i = 0; i < count; i++) {
nodes.put(i, new PrestoNode("other " + i, URI.create("http://127.0.0.1:11"), NodeVersion.UNKNOWN));
}
return nodes.build();
}