一尘不染

使用Spring Data MongoDB在事务中的两个不同ReactiveMongoRepository中调用方法?

spring-boot

在Spring Data MongoDB中使用反应式编程模型时,可以执行如下事务:

Mono<DeleteResult> result = template.inTransaction()                                      
    .execute(action -> action.remove(query(where("id").is("step-1")), Step.class));

但是Spring Data MongoDB也支持“反应性存储库”,例如:

public interface PersonRepository extends ReactiveMongoRepository<Person, String>

  Flux<Person> findByLocationNear(Point location, Distance distance);
}

public interface CarRepository extends ReactiveMongoRepository<Car, String>

  Flux<Car> findByYear(int year);
}

我的问题是,假设您拥有,您是否ReactiveMongoRepository可以以某种方式利用MongoDB事务,例如将a
Person和插入Car同一事务中(使用PersonRepositoryCarRepository大小写)?如果是这样,您该怎么做?


阅读 1210

收藏
2020-05-30

共1个答案

一尘不染

我也一直在努力寻找Mongo DB和Spring Boot的Reactive风格的 事务 支持解决方案

但是幸运的是我自己想通了。尽管Google提供的一些功能也没有帮助,但这些都是无反应的。

  • 您需要使用 ReactiveMongoTransactionManager 沿 ReactiveMongoDatabaseFactory ,大部分的末尾的详细信息,也分享代码回购为同

  • 为了使mongo数据库支持事务,我们需要确保数据库应以副本方式运行

为什么我们需要那个? 因为否则您会得到一些这样的错误:

此客户端连接到的MongoDB集群不支持会话

相同的说明如下:

  1. 使用docker-compose.yml运行基于docker-compose的mongo数据库服务器,如下所示:-
version: "3"
services:
    mongo:
        hostname: mongo
        container_name: localmongo_docker
        image: mongo
        expose:
          - 27017
        ports:
          - 27017:27017
        restart: always
        entrypoint: [ "/usr/bin/mongod", "--bind_ip_all", "--replSet", "rs0" ]
        volumes:
          - ./mongodata:/data/db # need to create a docker volume named as mongodata first
  1. 映像显示后,执行命令(此处 localmongo_docker 是容器的名称):-
docker exec -it localmongo_docker mongo
  1. 复制并粘贴下面的命令并执行
rs.initiate(
   {
     _id : 'rs0',
     members: [
       { _id : 0, host : "mongo:27017" }
     ]
   }
 )
  1. 然后通过输入 exit* 退出执行 *

重要 -代码仓库可以在我的github上找到-https: //github.com/krnbr/mongo-spring-boot-
template

该代码的重要说明如下:-

  • **config 包中的 MongoConfiguration 类是使事务正常运行的重要部分,链接到配置类在这里
  • 主要部分是
@Bean
ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory dbFactory) {
return new ReactiveMongoTransactionManager(dbFactory);
}
  • 为了检查代码的事务性需求的工作情况,您可以在此处查看服务包中的UserService类

如果链接对某人无效,请共享代码:-

配置和Bean内部

@Configuration
public class MongoConfiguration extends AbstractMongoClientConfiguration {

    @Autowired
    private MongoProperties mongoProperties;

    @Bean
    ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory dbFactory) {
        return new ReactiveMongoTransactionManager(dbFactory);
    }

    @Override
    protected String getDatabaseName() {
        return mongoProperties.getDatabase();
    }

    @Override
    public MongoClient mongoClient() {
        return MongoClients.create(mongoProperties.getUri());
    }
}

application.properties (与mongo db有关)

spring.data.mongodb.database=mongo
spring.data.mongodb.uri=mongodb://localhost:27017/mongo?replicaSet=rs0

文件类别

角色类别

@Getter
@Setter
@Accessors(chain = true)
@Document(collection = "roles")
@TypeAlias("role")
public class Role implements Persistable<String> {

    @Id
    private String id;

    @Field("role_name")
    @Indexed(unique = true)
    private String role;

    @CreatedDate
    private ZonedDateTime created;

    @LastModifiedDate
    private ZonedDateTime updated;

    private Boolean deleted;

    private Boolean enabled;

    @Override
    @JsonIgnore
    public boolean isNew() {
        if(getCreated() == null)
            return true;
        else
            return false;
    }
}

用户类别

@Getter
@Setter
@Accessors(chain = true)
@Document(collection = "users")
@JsonInclude(JsonInclude.Include.NON_NULL)
@TypeAlias("user")
public class User implements Persistable<String> {

    @Id()
    private String id;

    @Field("username")
    @Indexed(unique = true)
    @JsonProperty("username")
    private String userName;

    @JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
    private String password;

    @CreatedDate
    private ZonedDateTime created;

    @LastModifiedDate
    private ZonedDateTime updated;

    private Boolean deleted;

    private Boolean enabled;

    @DBRef(lazy = true)
    @JsonProperty("roles")
    private List<Role> roles = new ArrayList();

    @Override
    @JsonIgnore
    public boolean isNew() {
        if(getCreated() == null)
            return true;
        else
            return false;
    }
}

UserProfile类别

@Getter
@Setter
@Accessors(chain = true)
@Document(collection = "user_profiles")
@JsonInclude(JsonInclude.Include.NON_NULL)
@TypeAlias("user_profile")
public class UserProfile implements Persistable<String> {

    @Id
    private String id;

    @Indexed(unique = true)
    private String mobile;

    @Indexed(unique = true)
    private String email;

    private String address;

    private String firstName;

    private String lastName;

    @DBRef
    private User user;

    @CreatedDate
    private ZonedDateTime created;

    @LastModifiedDate
    private ZonedDateTime updated;

    private Boolean deleted;

    private Boolean enabled;

    @Override
    @JsonIgnore
    public boolean isNew() {
        if(getCreated() == null)
            return true;
        else
            return false;
    }

}

ReactiveMongoRepository接口

角色库

public interface RoleRepository extends ReactiveMongoRepository<Role, String> {

    Mono<Role> findByRole(String role);

    Flux<Role> findAllByRoleIn(List<String> roles);

}

用户资料库

public interface UserRepository extends ReactiveMongoRepository<User, String> {

    Mono<User> findByUserName(String userName);

}

UserProfileRepository

public interface UserProfileRepository extends ReactiveMongoRepository<UserProfile, String> {
}

用户服务类 需要在这里创建自己的RuntimeException类,这里是AppRuntimeException类,我一直在使用

@Slf4j
@Service
public class UserService {

    @Autowired
    private RoleRepository roleRepository;

    @Autowired
    private UserRepository userRepository;

    @Autowired
    private UserProfileRepository userProfileRepository;

    @Transactional
    public Mono<UserProfile> saveUserAndItsProfile(final UserRequest userRequest) {

        Mono<Role> roleMono = roleRepository.findByRole("USER");

        Mono<User> userMono = roleMono.flatMap(r -> {
            User user = new User()
                    .setUserName(userRequest.getUsername())
                    .setPassword(userRequest.getPassword());
            user.setRoles(Arrays.asList(r));
            return userRepository.save(user);
        }).onErrorResume(ex -> {
            log.error(ex.getMessage());
            if(ex instanceof DuplicateKeyException) {
                String errorMessage = "The user with the username '"+userRequest.getUsername()+"' already exists";
                log.error(errorMessage);
                return Mono.error(new AppRuntimeException(errorMessage, ErrorCodes.CONFLICT, ex));
            }
            return Mono.error(new AppRuntimeException(ex.getMessage(), ErrorCodes.INTERNAL_SERVER_ERROR, ex));
        });

        Mono<UserProfile> userProfileMono = userMono.flatMap(u -> {
            UserProfile userProfile = new UserProfile()
                    .setAddress(userRequest.getAddress())
                    .setEmail(userRequest.getEmail())
                    .setMobile(userRequest.getMobile())
                    .setUser(u);
            return userProfileRepository.save(userProfile);
        }).onErrorResume(ex -> {
            log.error(ex.getMessage());
            if(ex instanceof DuplicateKeyException) {
                String errorMessage = "The user with the profile mobile'"+userRequest.getMobile()+"' and/or - email '"+userRequest.getEmail()+"' already exists";
                log.error(errorMessage);
                return Mono.error(new AppRuntimeException(errorMessage, ErrorCodes.CONFLICT, ex));
            }
            return Mono.error(new AppRuntimeException(ex.getMessage(), ErrorCodes.INTERNAL_SERVER_ERROR, ex));
        });

        return userProfileMono;

    }

}

控制器和模型类

UserRequest 模型类别

@Getter
@Setter
@Accessors(chain = true)
@Slf4j
@JsonInclude(JsonInclude.Include.NON_NULL)
public class UserRequest {

    private String username;
    private String password;
    private String mobile;
    private String email;
    private String address;
    private String firstName;
    private String lastName;

}

UserProfileApisController

@Slf4j
@RestController
@RequestMapping("/apis/user/profile")
public class UserProfileApisController {

    @Autowired
    private UserService userService;

    @PostMapping
    public Mono<UserProfile> saveUserProfile(final @RequestBody UserRequest userRequest) {
        return userService.saveUserAndItsProfile(userRequest);
    }

}
2020-05-30