public int handleFlush_batch(String id) { Statement query; int flushedCount = 0; BatchStatement batch = new BatchStatement(Type.UNLOGGED); while ((query = queue.poll()) != null) { flushedCount++; batch.add(query); } executeQuery(id, batch, System.nanoTime()); metricCompleted.inc(flushedCount); return flushedCount; }
protected ListenableFuture<Statement> mergeStatements(ListenableFuture<Statement> statementFuture, ListenableFuture<ImmutableSet<Statement>> cascadingStatmentsFuture) { ListenableFuture<ImmutableSet<Statement>> statementsFuture = ListenableFutures.join(cascadingStatmentsFuture, statementFuture, getExecutor()); Function<ImmutableSet<Statement>, Statement> statementsBatcher = new Function<ImmutableSet<Statement>, Statement>() { public Statement apply(ImmutableSet<Statement> statements) { BatchStatement batchStatement = new BatchStatement(Type.LOGGED); for (Statement statement : statements) { batchStatement.add(statement); } return batchStatement; }; }; return Futures.transform(statementsFuture, statementsBatcher); }
@Override public ListenableFuture<Statement> getStatementAsync(final DBSession dbSession) { Function<CounterMutation, ListenableFuture<Statement>> statementFetcher = new Function<CounterMutation, ListenableFuture<Statement>>() { public ListenableFuture<Statement> apply(CounterMutation batchable) { return batchable.getStatementAsync(dbSession); }; }; return mergeToBatch(Type.COUNTER, batchables.iterator(), statementFetcher); }
@Override protected void processRequest (final Request request) throws Exception { // check request this.checkRequest(request); // use the following request Object to process the request and set // the response to be returned RequestUnlikePost requestUnlikePost = (RequestUnlikePost)request.getRequestJsonBody(); // set post_id UUID postId = UUID.fromString(requestUnlikePost.post_id); // get like_time ResultSet resultSet = PostLikesTime.i().executeSyncSelect( postId, requestUnlikePost.getUserId() ); long likeTime = resultSet.one().getLong(PostLikesTime.kLikeTimeColumnName); // all queries must succeed BatchStatement batchStatement = new BatchStatement(Type.LOGGED); // delete from ig_app_data.post_likes batchStatement.add( PostLikes.i().getBoundStatementDelete( postId, likeTime, requestUnlikePost.getUserId() ) ); // delete from ig_app_data.post_likes_time batchStatement.add( PostLikesTime.i().getBoundStatementDelete( postId, requestUnlikePost.getUserId() ) ); // execute batch statement Cassandra.i().executeSync(batchStatement); // decrement post's likes count PostLikesCount.i().executeSyncDecrement(postId); // set response ((ResponseUnlikePost)request.getResponseBody() ).set( requestUnlikePost.request_tracking_id); }
@Override protected void processRequest (final Request request) throws Exception { // check request this.checkRequest(request); // use the following request Object to process the request and set // the response to be returned RequestUnfollow requestUnfollow = (RequestUnfollow)request.getRequestJsonBody(); // make unfollow_user_id UUID UUID unfollowUserId = UUID.fromString(requestUnfollow.unfollow_user_id); // get follow_time ResultSet resultSet = FollowingTime.i().executeSyncSelect( requestUnfollow.getUserId(), unfollowUserId); long followTime = resultSet.one().getLong(FollowingTime.kFollowingTimeColumnName); // all queries must succeed BatchStatement batchStatement = new BatchStatement(Type.LOGGED); // delete from ig_app_data.following batchStatement.add( Following.i().getBoundStatementDelete( requestUnfollow.getUserId(), followTime, unfollowUserId) ); // delete from ig_app_data.following_time batchStatement.add( FollowingTime.i().getBoundStatementDelete( requestUnfollow.getUserId(), unfollowUserId) ); // delete from ig_app_data.follower batchStatement.add( Follower.i().getBoundStatementDelete( unfollowUserId, followTime, requestUnfollow.getUserId() ) ); // delete from ig_app_data.follower_time batchStatement.add( FollowerTime.i().getBoundStatementDelete( unfollowUserId, requestUnfollow.getUserId() ) ); // execute batch statement Cassandra.i().executeSync(batchStatement); // set response ((ResponseUnfollow)request.getResponseBody() ).set( requestUnfollow.request_tracking_id); }
@Override protected void processRequest (final Request request) throws Exception { // use the following request Object to process the request and set // the response to be returned RequestComment requestComment = (RequestComment)request.getRequestJsonBody(); UUID postId = UUID.fromString(requestComment.post_id); // post doesn't exist? if (CheckersInl.postExists(postId) == false) { throw new BadRequestException( 401, 1, "Can't comment on a post that doesn't exist, post_id [" + requestComment.post_id + "]. Request issued by user_id [" + requestComment.user_id + "] from device_token [" + requestComment.device_token + "]", ExceptionClass.INVALID); } // NOTE: vos_instagram intentionally allows only one comment for each // user on a post // in case a user comments more than once on the same post, the newer // comment overwrites the older one // all queries must succeed BatchStatement batchStatement = new BatchStatement(Type.LOGGED); ResultSet resultSet = PostCommentsTime.i().executeSyncSelect( postId, requestComment.getUserId() ); // user commented on this post before? if (resultSet.isExhausted() == false) { long oldCommentTime = resultSet.one().getLong(PostCommentsTime.kCommentTimeColumnName); // delete old user's comment batchStatement.add( PostComments.i().getBoundStatementDelete( postId, oldCommentTime, requestComment.getUserId() ) ); } else { // first time for this user to comment on this post? increment comments // count PostCommentsCount.i().executeSyncIncrement(postId); } // insert into ig_app_data.post_comments batchStatement.add( PostComments.i().getBoundStatementInsert( postId, request.getStartTime(), requestComment.getUserId(), requestComment.comment) ); // insert into ig_app_data.post_comments_time batchStatement.add( PostCommentsTime.i().getBoundStatementInsert( postId, requestComment.getUserId(), request.getStartTime() ) ); // execute batch statement Cassandra.i().executeSync(batchStatement); // set response ((ResponseComment)request.getResponseBody() ).set( requestComment.request_tracking_id, request.getStartTime() ); }
@Override protected void processRequest (final Request request) throws Exception { // use the following request Object to process the request and set // the response to be returned RequestDeleteComment requestDeleteComment = (RequestDeleteComment)request.getRequestJsonBody(); // get post comment's time UUID postId = UUID.fromString(requestDeleteComment.post_id); ResultSet resultSet = PostCommentsTime.i().executeSyncSelect( postId, requestDeleteComment.getUserId() ); // post/comment doesn't exist? if (resultSet.isExhausted() == true) { throw new BadRequestException( 402, 1, "User [" + requestDeleteComment.user_id + "] didn't comment on post [" + requestDeleteComment.post_id + "], then can't delete a comment that doesn't exist", ExceptionClass.INVALID); } // extract comment time long commentTime = resultSet.one().getLong(PostCommentsTime.kCommentTimeColumnName); // all queries must succeed BatchStatement batchStatement = new BatchStatement(Type.LOGGED); // delete from ig_app_data.post_comments batchStatement.add( PostComments.i().getBoundStatementDelete( postId, commentTime, requestDeleteComment.getUserId() ) ); // delete from ig_app_data.post_comments_time batchStatement.add( PostCommentsTime.i().getBoundStatementDelete( postId, requestDeleteComment.getUserId() ) ); // execute batch statement Cassandra.i().executeSync(batchStatement); // decrement ig_app_data.post_comments_count PostCommentsCount.i().executeSyncDecrement(postId); // set response ((ResponseDeleteComment)request.getResponseBody() ).set( requestDeleteComment.request_tracking_id); }
@Override protected void processRequest (final Request request) throws Exception { // check request this.checkRequest(request); // use the following request Object to process the request and set // the response to be returned RequestFollow requestFollow = (RequestFollow)request.getRequestJsonBody(); // make follow_user_id UUID UUID followUserId = UUID.fromString(requestFollow.follow_user_id); // all queries must succeed BatchStatement batchStatement = new BatchStatement(Type.LOGGED); // insert into ig_app_data.following batchStatement.add( Following.i().getBoundStatementInsert( requestFollow.getUserId(), request.getStartTime(), followUserId) ); // insert into ig_app_data.following_time batchStatement.add( FollowingTime.i().getBoundStatementInsert( requestFollow.getUserId(), followUserId, request.getStartTime() ) ); // insert into ig_app_data.follower batchStatement.add( Follower.i().getBoundStatementInsert( followUserId, request.getStartTime(), requestFollow.getUserId() ) ); // insert into ig_app_data.follower_time batchStatement.add( FollowerTime.i().getBoundStatementInsert( followUserId, requestFollow.getUserId(), request.getStartTime() ) ); // execute batch statement Cassandra.i().executeSync(batchStatement); // set response ((ResponseFollow)request.getResponseBody() ).set( requestFollow.request_tracking_id); }
@Override protected void processRequest (final Request request) throws Exception { // use the following request Object to process the request and set // the response to be returned RequestRefreshAccessToken requestRefreshAccessToken = (RequestRefreshAccessToken)request.getRequestJsonBody(); // generate new authentication tokens OAuth2Tokens oAuth2Tokens = new OAuth2Tokens(); // all queries must succeed BatchStatement batchStatement = new BatchStatement(Type.LOGGED); // insert into auth_codes batchStatement.add( AuthCodes.i().getBoundStatementInsert( requestRefreshAccessToken.getUserId(), requestRefreshAccessToken.device_token, oAuth2Tokens.getAuthorizationCode(), oAuth2Tokens.getAccessToken(), oAuth2Tokens.getRefreshToken(), ((int)Constants.kAuthCodeLifeTime.getAs( TimeUnitType.SECOND).getValue() ) ) ); // delete from access_tokens batchStatement.add( AccessTokens.i().getBoundStatementDelete( requestRefreshAccessToken.getUserId(), requestRefreshAccessToken.device_token) ); // delete from refresh_tokens batchStatement.add( RefreshTokens.i().getBoundStatementDelete( requestRefreshAccessToken.getUserId(), requestRefreshAccessToken.device_token) ); // execute batch statement Cassandra.i().executeSync(batchStatement); // set response ((ResponseRefreshAccessToken)request.getResponseBody() ).set( requestRefreshAccessToken.request_tracking_id, oAuth2Tokens.getAuthorizationCode() ); }
@Override protected void processRequest (final Request request) throws Exception { // use the following request Object to process the request and set // the response to be returned RequestUpdateFacebookInfo requestUpdateFacebookInfo = (RequestUpdateFacebookInfo)request.getRequestJsonBody(); // authenticate Facebook's access token FacebookAuthInl.validateFacebookAccessToken( requestUpdateFacebookInfo.fb_access_token, Constants.kFacebookAppId); // get user's facebook id FacebookGraph facebookGraph = new FacebookGraph(requestUpdateFacebookInfo.fb_access_token); String facebookId = facebookGraph.getUserId(); // get user's info from facebook // get user's Facebook profile picture String profilePicture = facebookGraph.getProfilePictureSync( Constants.kFacebookProfilePictureDimension); // database queries // insert into ig_blobs.profile_pictures_blobs ProfilePicturesBlobs.i().executeSyncInsert( requestUpdateFacebookInfo.getUserId(), EncodingInl.encodeStringIntoByteBuffer(profilePicture) ); try { // set profile_picture_id in ig_app_data.users_info UsersInfo.i().executeSyncSetProfilePictureId( requestUpdateFacebookInfo.getUserId(), requestUpdateFacebookInfo.getUserId() ); } catch (Exception e) { ProfilePicturesBlobs.i().executeSyncDelete( requestUpdateFacebookInfo.getUserId() ); throw e; } // all queries must succeed - ig_auth BatchStatement batchStatement = new BatchStatement(Type.LOGGED); // insert into ig_auth.facebook_ids batchStatement.add( FacebookIds.i().getBoundStatementInsert( facebookId, requestUpdateFacebookInfo.getUserId() ) ); // set facebook_id in ig_auth.users_cred_ids batchStatement.add( UsersCredIds.i().getBoundStatementSetFacebookId( facebookId, requestUpdateFacebookInfo.getUserId() ) ); // execute batch statement Cassandra.i().executeSync(batchStatement); // set response ((ResponseUpdateFacebookInfo)request.getResponseBody() ).set( requestUpdateFacebookInfo.request_tracking_id); }
@Override protected void processRequest (final Request request) throws Exception { // check request this.checkRequest(request); // use the following request Object to process the request and set // the response to be returned RequestLikePost requestLikePost = (RequestLikePost)request.getRequestJsonBody(); // set post_id UUID postId = UUID.fromString(requestLikePost.post_id); // all queries must succeed BatchStatement batchStatement = new BatchStatement(Type.LOGGED); // insert into ig_app_data.post_likes batchStatement.add( PostLikes.i().getBoundStatementInsert( postId, request.getStartTime(), requestLikePost.getUserId() ) ); // insert into ig_app_data.post_likes_time batchStatement.add( PostLikesTime.i().getBoundStatementInsert( postId, requestLikePost.getUserId(), request.getStartTime() ) ); // execute batch statement Cassandra.i().executeSync(batchStatement); // increment post's likes count PostLikesCount.i().executeSyncIncrement(postId); // set response ((ResponseLikePost)request.getResponseBody() ).set( requestLikePost.request_tracking_id, request.getStartTime() ); }
protected <T> ListenableFuture<Statement> mergeToBatch(Type batchType, UnmodifiableIterator<T> batchablesIt, Function<T, ListenableFuture<Statement>> statementFetcher) { return new BatchQueryFutureAdapter<>(new BatchStatement(batchType), batchablesIt, statementFetcher); }
BatchMutationQuery(Context ctx, Batchable<?> mutation1, Batchable<?> mutation2) { this(ctx, Type.LOGGED, join(mutation1, mutation2)); }
private BatchMutationQuery(Context ctx, Type type, ImmutableList<Batchable<?>> batchables) { super(ctx); this.type = type; this.batchables = batchables; }
private BatchMutationQuery newQuery(Type type, ImmutableList<Batchable<?>> batchables) { return new BatchMutationQuery(getContext(), type, batchables); }
@Override public BatchMutationQuery withWriteAheadLog() { return newQuery(Type.LOGGED, batchables); }
@Override public BatchMutationQuery withoutWriteAheadLog() { return newQuery(Type.UNLOGGED, batchables); }
private void executeUpdatesSynchronous(DBTransaction transaction) { BatchStatement batchState = new BatchStatement(Type.UNLOGGED); batchState.addAll(getMutations(transaction)); executeBatch(batchState); }