/** * Creates an {@link ITopic} proxy on the combination of a * {@link TransactionalQueue} and an actual {@link ITopic} instance. The proxy * will offer items to the transactional queue when they are published on the * topic. All other topic methods are simply passed through to the underlying * topic. By offering items to the queue on publish, a transactional topic can * be simulated via the ITopic interface. * * @param <E> the type of items in the topic * @param queue the transactional queue to offer all published objects * @param topic the underlying topic to handle all other operations * * @return the proxy around the queue and topic */ @SuppressWarnings("unchecked") public static <E> ITopic<E> createTopicProxy( final TransactionalQueue<E> queue, final ITopic<E> topic) { InvocationHandler handler = new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (method.getName().equals("publish")) { return queue.offer((E) args[0]); } else { return method.invoke(topic, args); } } }; return (ITopic<E>) Proxy.newProxyInstance( ITopic.class.getClassLoader(), new Class[]{ITopic.class}, handler); }
@TimeStep public void timeStep(ThreadState state) { firstLock.lock(); try { TransactionContext ctx = targetInstance.newTransactionContext(); try { ctx.beginTransaction(); TransactionalQueue<Integer> queue = ctx.getQueue(name + 'q'); queue.offer(1); secondLock.lock(); secondLock.unlock(); queue.take(); ctx.commitTransaction(); state.counter.committed++; } catch (Exception txnException) { try { ctx.rollbackTransaction(); state.counter.rolled++; logger.fatal(name + ": Exception in txn " + state.counter, txnException); } catch (Exception rollException) { state.counter.failedRollbacks++; logger.fatal(name + ": Exception in roll " + state.counter, rollException); } } } catch (Exception e) { logger.fatal(name + ": outer Exception" + state.counter, e); } finally { firstLock.unlock(); } }
private static void run() { Config config = new Config("queueTest"); QueueConfig queueConfig = config.getQueueConfig(QNAME); QueueStoreConfig queueStoreConfig = new QueueStoreConfig(); queueStoreConfig.setEnabled(true); queueStoreConfig.setStoreImplementation(new MockQueueStore()); queueStoreConfig.getProperties().setProperty("memory-limit", "0"); queueConfig.setQueueStoreConfig(queueStoreConfig); HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance(config); long startTime = System.currentTimeMillis(); int i = 0; while (i++ < 2000000) { if (i % 10000 == 0) { logger.info(Integer.toString(i) + "\t" + String.format("%8.3f", (double) (System.currentTimeMillis() - startTime) / i)); } TransactionOptions options = new TransactionOptions().setTransactionType(TransactionOptions.TransactionType.LOCAL); TransactionContext context = hzInstance.newTransactionContext(options); context.beginTransaction(); TransactionalQueue<Integer> queue = context.getQueue(QNAME); queue.offer(i); context.commitTransaction(); } }
/** * Creates an {@link IQueue} proxy around a {@link TransactionalQueue}. This * allows for common handling of queues regardless of if they are * transactional or not. Ideally Hazelcast's transactional queue would * directly implement IQueue but that isn't the case. * * @param <E> the type of objects in the queue * @param queue the transaction queue to create the proxy around * * @return the proxy to the transactional queue */ @SuppressWarnings("unchecked") public static <E> IQueue<E> createQueueProxy(TransactionalQueue<E> queue) { InvocationHandler handler = new TransactionalQueueInvocationHandler<>( queue); return (IQueue<E>) Proxy.newProxyInstance( queue.getClass().getClassLoader(), new Class[]{IQueue.class}, handler); }
/** * Constructs the handler which will map all operations to the given queue. * * @param queue the delegate queue */ public TransactionalQueueInvocationHandler(TransactionalQueue<E> queue) { this.delegate = queue; }