我想要几个“包”(Mjbundle),它们本质上是问题包(Mjquestion)。Mjquestion 有一个整数“索引”属性,该属性必须是唯一的,但它应该只在包含它的包中是唯一的。我不确定如何正确地对这样的事情进行建模,我尝试使用下面的结构化(重复)属性来做到这一点,但实际上还没有任何东西可以限制 Mjquestion 索引的唯一性。有什么更好/正常/正确的方法来做到这一点?
class Mjquestion(ndb.Model): """This is a Mjquestion.""" index = ndb.IntegerProperty(indexed=True, required=True) genre1 = ndb.IntegerProperty(indexed=False, required=True, choices=[1,2,3,4,5,6,7]) genre2 = ndb.IntegerProperty(indexed=False, required=True, choices=[1,2,3]) #(will add a bunch of more data properties later) class Mjbundle(ndb.Model): """This is a Mjbundle.""" mjquestions = ndb.StructuredProperty(Mjquestion, repeated=True) time = ndb.DateTimeProperty(auto_now_add=True)
(使用上述模型并获取了某个 Mjbundle 实体后,我不确定如何根据索引从 mjquestions 中快速获取 Mjquestion。关于结构化属性过滤的解释看起来适用于 Mjbundle 类型级别,而我已经有一个 Mjbundle 实体,并且不确定如何快速查询该实体包含的问题,而无需在代码中“手动”循环遍历它们。)
你在尝试设计一个包含多个 Mjquestion 的 Mjbundle 模型时,遇到的问题主要集中在两个方面:
Mjquestion
Mjbundle
index
Mjquestion.index
为了确保每个 Mjbundle 内的 Mjquestion 有唯一的 index,你可以通过以下两种方法实现:
在保存 Mjquestion 时检查唯一性: 在将 Mjquestion 添加到 Mjbundle 中时,检查当前包中是否已经存在该 index,如果存在则抛出异常。这是一种“软”保证,确保唯一性,但需要手动检查。
在数据库层面使用事务性检查: 使用 Google Cloud Datastore 的事务(ndb.transaction)来确保在同一时刻,只有一个操作会修改一个 Mjbundle 中的 Mjquestion 列表,这样可以避免并发操作中的唯一性冲突。
ndb.transaction
由于你使用了 ndb.StructuredProperty 来存储 Mjquestion 列表,直接通过 index 在 Mjquestion 内部查询可能会比较低效,因为你必须手动遍历列表进行搜索。如果你希望更高效地查询,你有几个选择:
ndb.StructuredProperty
```python class Mjbundle(ndb.Model): “”“This is a Mjbundle.”“” mjquestions = ndb.StructuredProperty(Mjquestion, repeated=True) time = ndb.DateTimeProperty(auto_now_add=True)
def get_question_by_index(self, index): """通过 index 查找 Mjquestion。""" question_dict = {q.index: q for q in self.mjquestions} return question_dict.get(index, None)
```
这样,你就可以通过 get_question_by_index() 方法,快速通过 index 获取对应的 Mjquestion,而不需要遍历整个列表。
get_question_by_index()
```python class Mjquestion(ndb.Model): “”“This is a Mjquestion.”“” index = ndb.IntegerProperty(indexed=True, required=True) genre1 = ndb.IntegerProperty(indexed=False, required=True, choices=[1,2,3,4,5,6,7]) genre2 = ndb.IntegerProperty(indexed=False, required=True, choices=[1,2,3]) bundle_key = ndb.KeyProperty(kind=Mjbundle) # 关联的 Mjbundle
class Mjbundle(ndb.Model): “”“This is a Mjbundle.”“” time = ndb.DateTimeProperty(auto_now_add=True)
def get_question_by_index(self, index): """通过 index 获取 Mjquestion。""" question = Mjquestion.query(Mjquestion.bundle_key == self.key, Mjquestion.index == index).get() return question
这种方式会让你能直接从 Mjquestion 进行高效查询,并且保证 index 在每个 Mjbundle 中唯一(通过 Mjquestion.query 进行查询时,使用 bundle_key 来限定范围)。
Mjquestion.query
bundle_key
class Mjquestion(ndb.Model): """This is a Mjquestion.""" index = ndb.IntegerProperty(indexed=True, required=True) genre1 = ndb.IntegerProperty(indexed=False, required=True, choices=[1, 2, 3, 4, 5, 6, 7]) genre2 = ndb.IntegerProperty(indexed=False, required=True, choices=[1, 2, 3]) bundle_key = ndb.KeyProperty(kind='Mjbundle') # 关联的 Mjbundle class Mjbundle(ndb.Model): """This is a Mjbundle.""" mjquestions = ndb.StructuredProperty(Mjquestion, repeated=True) time = ndb.DateTimeProperty(auto_now_add=True) def get_question_by_index(self, index): """通过 index 获取 Mjquestion。""" question_dict = {q.index: q for q in self.mjquestions} return question_dict.get(index, None) def add_question(self, question): """添加 Mjquestion,确保索引唯一。""" if any(q.index == question.index for q in self.mjquestions): raise ValueError(f"Index {question.index} already exists in this bundle.") self.mjquestions.append(question) self.put() # 保存 Mjbundle # 示例使用: bundle = Mjbundle(time=datetime.datetime.now()) bundle.put() # 创建问题 question1 = Mjquestion(index=1, genre1=2, genre2=1, bundle_key=bundle.key) question2 = Mjquestion(index=2, genre1=3, genre2=2, bundle_key=bundle.key) # 将问题添加到包中 bundle.add_question(question1) bundle.add_question(question2) # 根据索引查询问题 q = bundle.get_question_by_index(1) print(q) # 打印 index 为 1 的问题