我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.orm.joinedload()。
def _load_dmail(dispatcher, dmail_dbid, fetch_parts=False, fetch_tags=False): def dbcall(): with dispatcher.node.db.open_session(True) as sess: q = sess.query(DmailMessage) if fetch_parts: q = q.options(joinedload("parts")) if fetch_tags: q = q.options(joinedload("tags")) q = q.filter(DmailMessage.id == dmail_dbid) dm = q.first() sess.expunge_all() return dm dm = yield from dispatcher.node.loop.run_in_executor(None, dbcall) return dm
def _process_dmail_message(dispatcher, msg_dbid, process_call,\ fetch_parts=False, fetch_tags=False): def dbcall(): with dispatcher.node.db.open_session() as sess: q = sess.query(DmailMessage) if fetch_parts: q = q.options(joinedload("parts")) if fetch_tags: q = q.options(joinedload("tags")) q = q.filter(DmailMessage.id == msg_dbid) dm = q.first() if process_call(sess, dm): sess.expire_on_commit = False sess.commit() sess.expunge_all() return dm dm = yield from dispatcher.node.loop.run_in_executor(None, dbcall) return dm
def _start_dmail_autoscan(self): yield from self.engine.protocol_ready.wait() def dbcall(): with self.db.open_session() as sess: q = sess.query(DmailAddress)\ .options(joinedload("keys"))\ .filter(DmailAddress.scan_interval > 0) r = q.all() sess.expunge_all() return r addrs = yield from self.loop.run_in_executor(None, dbcall) for addr in addrs: self.update_dmail_autoscan(addr)
def get_current_user(self): """Get currently logged in user. The currently logged in user_id is stored in a secure cookie by Python Social Auth. """ # This cookie is set by Python Social Auth's # BaseHandler: # https://github.com/python-social-auth/social-app-tornado/blob/master/social_tornado/handlers.py user_id = self.get_secure_cookie('user_id') if user_id is None: return None else: return User.query\ .options(joinedload('acls'))\ .get(int(user_id))
def list_machines(self, locked=False): """Lists virtual machines. @return: list of virtual machines """ session = self.Session() try: if locked: machines = session.query(Machine).options(joinedload("tags")).filter_by(locked=True).all() else: machines = session.query(Machine).options(joinedload("tags")).all() return machines except SQLAlchemyError as e: log.debug("Database error listing machines: {0}".format(e)) return [] finally: session.close()
def view_task(self, task_id, details=False): """Retrieve information on a task. @param task_id: ID of the task to query. @return: details on the task. """ session = self.Session() try: if details: task = session.query(Task).options(joinedload("guest"), joinedload("errors"), joinedload("tags")).get(task_id) else: task = session.query(Task).get(task_id) except SQLAlchemyError as e: log.debug("Database error viewing task: {0}".format(e)) return None else: if task: session.expunge(task) return task finally: session.close()
def view_machine(self, name): """Show virtual machine. @params name: virtual machine name @return: virtual machine's details """ session = self.Session() try: machine = session.query(Machine).options(joinedload("tags")).filter(Machine.name == name).first() except SQLAlchemyError as e: log.debug("Database error viewing machine: {0}".format(e)) return None else: if machine: session.expunge(machine) finally: session.close() return machine
def database_poll_loop(self): """ Keeps checking database for pending requests. Submits a new job if one is found. """ self.logger.info("Scheduler starts watching database.") while self.is_running: session = Session() pending_requests = ( session.query(Request). options(joinedload('options')). filter_by(pending=True). all() ) self.logger.debug("Found %d requests", len(pending_requests)) try: for request in pending_requests: self.submit_job(request) finally: session.commit() session.close() self._shutdown_event.wait(5)
def with_(cls, schema): """ Query class and eager load schema at once. :type schema: dict Example: schema = { 'user': JOINED, # joinedload user 'comments': (SUBQUERY, { # load comments in separate query 'user': JOINED # but, in this separate query, join user }) } # the same schema using class properties: schema = { Post.user: JOINED, Post.comments: (SUBQUERY, { Comment.user: JOINED }) } User.with_(schema).first() """ return cls.query.options(*eager_expr(schema or {}))
def with_joined(cls, *paths): """ Eagerload for simple cases where we need to just joined load some relations In strings syntax, you can split relations with dot due to this SQLAlchemy feature: https://goo.gl/yM2DLX :type paths: *List[str] | *List[InstrumentedAttribute] Example 1: Comment.with_joined('user', 'post', 'post.comments').first() Example 2: Comment.with_joined(Comment.user, Comment.post).first() """ options = [joinedload(path) for path in paths] return cls.query.options(*options)
def specific_festival(festival_route): """Show festival specific form to select desired artist for playlist.""" festival_info = Festival.query.filter_by(festival_route=festival_route).options(joinedload('festivalartists')).first() artist_list = festival_info.festivalartists artist_info = [] helper_func.build_lineup_list(artist_list, artist_info) # print "\nFinal Artist Info List:", artist_info, "\n" # print "\nLength of Artist Info List:", len(artist_info), "\n\n" return render_template("festival_specific.html", festival_info=festival_info, artist_info=artist_info, length=len(artist_info))
def reproduce_provider(provider, provider_path, session): """Reproduce an experiment from a data repository (provider). """ # Check the database for an experiment already stored matching the URI provider_key = '%s/%s' % (provider, provider_path) upload = (session.query(database.Upload) .options(joinedload(database.Upload.experiment)) .filter(database.Upload.provider_key == provider_key) .order_by(database.Upload.id.desc())).first() if not upload: try: upload = get_experiment_from_provider(session, request.remote_addr, provider, provider_path) except ProviderError as e: return render_template('setup_notfound.html', message=e.message), 404 # Also updates last access upload.experiment.last_access = functions.now() return reproduce_common(upload, session)
def reproduce_local(upload_short_id, session): """Show build log and ask for run parameters. """ # Decode info from URL app.logger.info("Decoding %r", upload_short_id) try: upload_id = short_ids.decode('upload', upload_short_id) except ValueError: return render_template('setup_notfound.html'), 404 # Look up the experiment in database upload = (session.query(database.Upload) .options(joinedload(database.Upload.experiment)) .get(upload_id)) if not upload: return render_template('setup_notfound.html'), 404 # Also updates last access upload.experiment.last_access = functions.now() return reproduce_common(upload, session)
def get_all_rolling_status(self): life_roll_list = [] for machine in self.session.query(Machine) \ .join(LifecycleRolling) \ .join(MachineInterface) \ .options(joinedload("interfaces")) \ .options(joinedload("lifecycle_rolling")) \ .filter(MachineInterface.as_boot == True): try: life_roll_list.append({ "mac": machine.interfaces[0].mac, "fqdn": machine.interfaces[0].fqdn, "cidrv4": machine.interfaces[0].cidrv4, "enable": bool(machine.lifecycle_rolling[0].enable), "created_date": machine.lifecycle_rolling[0].created_date, "updated_date": machine.lifecycle_rolling[0].updated_date }) except IndexError: pass return life_roll_list
def get_machines_by_roles(self, *roles): if len(roles) == 1: return self.get_machines_by_role(roles[0]) machines = [] roles = list(roles) with self.smart.new_session() as session: for machine in session.query(Machine) \ .options(joinedload("interfaces")) \ .options(joinedload("disks")) \ .join(Schedule) \ .filter(MachineInterface.as_boot == True): # TODO Maybe do this with a sqlalchemy filter func if len(roles) == len(roles) and set(k.role for k in machine.schedules) == set(roles): machines.append(self._construct_machine_dict(machine, roles)) return machines
def _host_get_by_uuid(context, host_uuid, segment_uuid=None): query = model_query(context, models.Host ).filter_by(uuid=host_uuid ).options(joinedload('failover_segment')) if segment_uuid: query = query.filter_by(failover_segment_id=segment_uuid) result = query.first() if not result: if segment_uuid: raise exception.HostNotFoundUnderFailoverSegment( host_uuid=host_uuid, segment_uuid=segment_uuid) else: raise exception.HostNotFound(id=host_uuid) return result
def floating_ip_fixed_ip_associate(context, floating_address, fixed_address, host): fixed_ip_ref = model_query(context, models.FixedIp).\ filter_by(address=fixed_address).\ options(joinedload('network')).\ first() if not fixed_ip_ref: raise exception.FixedIpNotFoundForAddress(address=fixed_address) rows = model_query(context, models.FloatingIp).\ filter_by(address=floating_address).\ filter(models.FloatingIp.project_id == context.project_id).\ filter(or_(models.FloatingIp.fixed_ip_id == fixed_ip_ref['id'], models.FloatingIp.fixed_ip_id.is_(None))).\ update({'fixed_ip_id': fixed_ip_ref['id'], 'host': host}) if not rows: raise exception.FloatingIpAssociateFailed(address=floating_address) return fixed_ip_ref
def floating_ip_disassociate(context, address): floating_ip_ref = model_query(context, models.FloatingIp).\ filter_by(address=address).\ first() if not floating_ip_ref: raise exception.FloatingIpNotFoundForAddress(address=address) fixed_ip_ref = model_query(context, models.FixedIp).\ filter_by(id=floating_ip_ref['fixed_ip_id']).\ options(joinedload('network')).\ first() floating_ip_ref.fixed_ip_id = None floating_ip_ref.host = None return fixed_ip_ref
def fixed_ip_get(context, id, get_network=False): query = model_query(context, models.FixedIp).filter_by(id=id) if get_network: query = query.options(joinedload('network')) result = query.first() if not result: raise exception.FixedIpNotFound(id=id) # FIXME(sirp): shouldn't we just use project_only here to restrict the # results? if (nova.context.is_user_context(context) and result['instance_uuid'] is not None): instance = instance_get_by_uuid(context.elevated(read_deleted='yes'), result['instance_uuid']) nova.context.authorize_project_context(context, instance.project_id) return result
def _build_instance_get(context, columns_to_join=None): query = model_query(context, models.Instance, project_only=True).\ options(joinedload_all('security_groups.rules')).\ options(joinedload('info_cache')) if columns_to_join is None: columns_to_join = ['metadata', 'system_metadata'] for column in columns_to_join: if column in ['info_cache', 'security_groups']: # Already always joined above continue if 'extra.' in column: query = query.options(undefer(column)) else: query = query.options(joinedload(column)) # NOTE(alaski) Stop lazy loading of columns not needed. for col in ['metadata', 'system_metadata']: if col not in columns_to_join: query = query.options(noload(col)) return query
def _manual_join_columns(columns_to_join): """Separate manually joined columns from columns_to_join If columns_to_join contains 'metadata', 'system_metadata', or 'pci_devices' those columns are removed from columns_to_join and added to a manual_joins list to be used with the _instances_fill_metadata method. The columns_to_join formal parameter is copied and not modified, the return tuple has the modified columns_to_join list to be used with joinedload in a model query. :param:columns_to_join: List of columns to join in a model query. :return: tuple of (manual_joins, columns_to_join) """ manual_joins = [] columns_to_join_new = copy.copy(columns_to_join) for column in ('metadata', 'system_metadata', 'pci_devices'): if column in columns_to_join_new: columns_to_join_new.remove(column) manual_joins.append(column) return manual_joins, columns_to_join_new
def instance_get_all(context, columns_to_join=None): if columns_to_join is None: columns_to_join_new = ['info_cache', 'security_groups'] manual_joins = ['metadata', 'system_metadata'] else: manual_joins, columns_to_join_new = ( _manual_join_columns(columns_to_join)) query = model_query(context, models.Instance) for column in columns_to_join_new: query = query.options(joinedload(column)) if not context.is_admin: # If we're not admin context, add appropriate filter.. if context.project_id: query = query.filter_by(project_id=context.project_id) else: query = query.filter_by(user_id=context.user_id) instances = query.all() return _instances_fill_metadata(context, instances, manual_joins)
def console_pool_get_by_host_type(context, compute_host, host, console_type): result = model_query(context, models.ConsolePool, read_deleted="no").\ filter_by(host=host).\ filter_by(console_type=console_type).\ filter_by(compute_host=compute_host).\ options(joinedload('consoles')).\ first() if not result: raise exception.ConsolePoolNotFoundForHostType( host=host, console_type=console_type, compute_host=compute_host) return result
def console_get(context, console_id, instance_uuid=None): query = model_query(context, models.Console, read_deleted="yes").\ filter_by(id=console_id).\ options(joinedload('pool')) if instance_uuid is not None: query = query.filter_by(instance_uuid=instance_uuid) result = query.first() if not result: if instance_uuid: raise exception.ConsoleNotFoundForInstance( console_id=console_id, instance_uuid=instance_uuid) else: raise exception.ConsoleNotFound(console_id=console_id) return result ##################
def aggregate_get_by_host(context, host, key=None): """Return rows that match host (mandatory) and metadata key (optional). :param host matches host, and is required. :param key Matches metadata key, if not None. """ query = model_query(context, models.Aggregate) query = query.options(joinedload('_hosts')) query = query.options(joinedload('_metadata')) query = query.join('_hosts') query = query.filter(models.AggregateHost.host == host) if key: query = query.join("_metadata").filter( models.AggregateMetadata.key == key) return query.all()
def find_by_name(name: str) -> Optional[BoardModel]: if not validation.check_board_name_validity(name): raise ArgumentError(MESSAGE_INVALID_NAME) board_cache = cache.get(cache_key('board_and_config', name)) if not board_cache: with session() as s: q = s.query(BoardOrmModel).filter_by(name=name) q = q.options(joinedload('config')) board_orm_model = q.one_or_none() if not board_orm_model: return None board = BoardModel.from_orm_model(board_orm_model, include_config=True) cache.set(cache_key('board_and_config', name), board.to_cache()) return board return BoardModel.from_cache(board_cache)
def list_web_hook(self): session = SessionManager.Session() try: web_hook_list = session.query(WebHook).\ options(joinedload(WebHook.created_by)).\ order_by(desc(getattr(WebHook, 'register_time'))).\ all() web_hook_dict_list = [] for web_hook in web_hook_list: web_hook_dict = row2dict(web_hook) web_hook_dict.pop('shared_secret', None) self.__process_user_obj_in_web_hook(web_hook, web_hook_dict) web_hook_dict_list.append(web_hook_dict) return json_resp({ 'data': web_hook_dict_list, 'total': len(web_hook_list) }) finally: SessionManager.Session.remove()
def get_episode(self, episode_id): try: session = SessionManager.Session() episode = session.query(Episode).\ options(joinedload(Episode.thumbnail_image)).\ filter(Episode.id == episode_id).\ filter(Episode.delete_mark == None).\ all() episode_dict = row2dict(episode) utils.process_episode_dict(episode, episode_dict) return json_resp({'data': episode_dict}) except NoResultFound: raise ClientError(ClientError.NOT_FOUND, 404) finally: SessionManager.Session.remove()
def _get_agents_real(request, user_id=None, view_def='default'): discussion = request.discussion user_id = user_id or Everyone agents = discussion.get_participants_query() permissions = request.permissions include_emails = P_ADMIN_DISC in permissions or P_SYSADMIN in permissions if include_emails: agents = agents.options(joinedload(AgentProfile.accounts)) num_posts_per_user = \ AgentProfile.count_posts_in_discussion_all_profiles(discussion) def view(agent): result = agent.generic_json(view_def, user_id, permissions) if result is None: return if include_emails or agent.id == user_id: result['preferred_email'] = agent.get_preferred_email() post_count = num_posts_per_user.get(agent.id, 0) if post_count: result['post_count'] = post_count return result return [view(agent) for agent in agents if agent is not None]
def select_view(userid, form): report = ( Report.query .options(joinedload('comments', innerjoin=True).joinedload('poster', innerjoin=True)) .get_or_404(int(form.reportid))) report.old_style_comments = [ { 'userid': c.userid, 'username': c.poster.profile.username, 'unixtime': c.unixtime, 'content': c.content, 'violation': _convert_violation(c.violation), } for c in report.comments] media.populate_with_user_media(report.old_style_comments) report.old_style_comments.sort(key=lambda c: c['unixtime']) return report
def select_reported_list(userid): q = ( Report.query .join(ReportComment) .options(contains_eager(Report.comments)) .options(joinedload('_target_sub')) .options(joinedload('_target_char')) .options(joinedload('_target_journal')) .filter(ReportComment.violation != 0) .filter_by(userid=userid)) reports = q.all() for report in reports: report.latest_report = max(c.unixtime for c in report.comments) reports.sort(key=lambda r: r.latest_report, reverse=True) return reports
def bucket_links(cls, identities): if not identities: return [] q = ( cls.dbsession.query(MediaItem, cls) .with_polymorphic([DiskMediaItem]) .join(cls, *cls._linkjoin) .options(joinedload('described')) .options(joinedload(cls._linkname)) .filter(getattr(cls, cls._identity).in_(identities))) for load in cls._load: q = q.options(joinedload(load)) buckets = collections.defaultdict(lambda: collections.defaultdict(list)) for media_item, link in q.all(): media_data = media_item.serialize(link=link) buckets[getattr(link, cls._identity)][link.link_type].append(media_data) return [dict(buckets[identity]) for identity in identities]
def _volume_type_get_by_name(context, name, session=None): result = model_query(context, models.VolumeTypes, session=session). \ options(joinedload('extra_specs')). \ filter_by(name=name). \ first() if not result: raise exceptions.VolumeTypeNotFoundByName(volume_type_name=name) return _dict_with_extra_specs_if_authorized(context, result)
def _volume_type_get_query(context, session=None, read_deleted='no'): query = model_query(context, models.VolumeTypes, session=session, read_deleted=read_deleted). \ options(joinedload('extra_specs')) if not context.is_admin: is_public = True the_filter = [models.VolumeTypes.is_public == is_public] query.filter(or_(*the_filter)) return query
def _volume_type_ref_get(context, id, session=None, inactive=False): read_deleted = "yes" if inactive else "no" result = model_query(context, models.VolumeTypes, session=session, read_deleted=read_deleted).\ options(joinedload('extra_specs')).\ filter_by(id=id).\ first() if not result: raise exceptions.VolumeTypeNotFound(volume_type_id=id) return result
def _get_migration_task_query_options(query): return query.options( orm.joinedload("tasks").joinedload("progress_updates")).options( orm.joinedload("tasks").joinedload("events"))
def get_task(context, task_id, include_migration_tasks=False): join_options = orm.joinedload("migration") if include_migration_tasks: join_options = join_options.joinedload("tasks") return _soft_delete_aware_query(context, models.Task).options( join_options).filter_by(id=task_id).first()
def get_schema_query(session, id): return session.query(Item)\ .options(joinedload(Item.modifiers))\ .filter_by(id=id)
def get_schema_query(session, uid): return session.query(Player)\ .options(joinedload(Player.characters))\ .filter_by(uid=uid)
def get_schema_query(session, uid): return session.query(Region)\ .options(joinedload(Region.sectors))\ .filter_by(uid=uid)
def port_find(context, limit=None, sorts=['id'], marker_obj=None, fields=None, **filters): query = context.session.query(models.Port).options( orm.joinedload(models.Port.ip_addresses)) model_filters = _model_query(context, models.Port, filters) if filters.get("ip_address_id"): model_filters.append(models.Port.ip_addresses.any( models.IPAddress.id.in_(filters["ip_address_id"]))) if filters.get("device_id"): model_filters.append(models.Port.device_id.in_(filters["device_id"])) if filters.get("service"): model_filters.append(models.Port.associations.any( models.PortIpAssociation.service == filters["service"])) if "join_security_groups" in filters: query = query.options(orm.joinedload(models.Port.security_groups)) if fields and "port_subnets" in fields: query = query.options(orm.joinedload("ip_addresses.subnet")) query = query.options( orm.joinedload("ip_addresses.subnet.dns_nameservers")) query = query.options( orm.joinedload("ip_addresses.subnet.routes")) return paginate_query(query.filter(*model_filters), models.Port, limit, sorts, marker_obj)
def port_find_by_ip_address(context, **filters): query = context.session.query(models.IPAddress).options( orm.joinedload(models.IPAddress.ports)) model_filters = _model_query(context, models.IPAddress, filters) return query.filter(*model_filters)
def _network_find(context, limit, sorts, marker, page_reverse, fields, defaults=None, provider_query=False, **filters): query = context.session.query(models.Network) model_filters = _model_query(context, models.Network, filters, query) if defaults: invert_defaults = False if INVERT_DEFAULTS in defaults: invert_defaults = True defaults.pop(0) if filters and invert_defaults: query = query.filter(and_(not_(models.Network.id.in_(defaults)), and_(*model_filters))) elif not provider_query and filters and not invert_defaults: query = query.filter(or_(models.Network.id.in_(defaults), and_(*model_filters))) elif not invert_defaults: query = query.filter(models.Network.id.in_(defaults)) else: query = query.filter(*model_filters) if "join_subnets" in filters: query = query.options(orm.joinedload(models.Network.subnets)) return paginate_query(query, models.Network, limit, sorts, marker)
def security_group_find(context, **filters): query = context.session.query(models.SecurityGroup).options( orm.joinedload(models.SecurityGroup.rules)) model_filters = _model_query(context, models.SecurityGroup, filters) return query.filter(*model_filters)
def get_target_list(): orm_session = model.Session() targets = orm_session.query(model.Target).options(joinedload("target_parms")).all() forwarder_classes = forwarder.Forwarder.get_forwarder_classnames() orm_session.close() if json_check(): return jsonify(items=[target.dict_repr() for target in targets]) return render_template("target_list.html.tpl", targets=targets, forwarder_classes=forwarder_classes)
def get_target(name): orm_session = model.Session() target = orm_session.query(model.Target).options(joinedload("target_parms")).filter(model.Target.target_name==name).first() orm_session.close() if target is None: error_msg = "Target " + name + " not found!" if json_check(): return json_error(error_msg, 404) flash(error_msg, "alert-danger") return redirect("/targets") else: if json_check(): return jsonify(target.dict_repr()) return render_template("target_view.html.tpl", target=target)
def test_target(name): # check if message parameter is set message = None try: message = request.json["message"] except: pass try: message = request.form["message"] except: pass orm_session = model.Session() target = orm_session.query(model.Target).options(joinedload("target_parms")).filter(model.Target.target_name==name).first() orm_session.close() if target is None: error_msg = "Target " + name + " not found!" if json_check(): return json_error(error_msg, 404) flash(error_msg, "alert-danger") return redirect("/targets") else: forwarder_obj = forwarder.Forwarder.create_forwarder(target.target_name, target.target_class, target.target_parms) forwarder_obj.test_forwarder(message) result_msg = "Target " + name + " tested" if json_check(): return json_result(result_msg, 200) flash(result_msg, "alert-success") return redirect("/targets")
def _load_dmail_address(dispatcher, dbid=None, site_key=None,\ fetch_keys=False): "Fetch from our database the parameters that are stored in a DMail site." def dbcall(): with dispatcher.node.db.open_session() as sess: q = sess.query(DmailAddress) if fetch_keys: q = q.options(joinedload("keys")) if dbid: q = q.filter(DmailAddress.id == dbid) elif site_key: q = q.filter(DmailAddress.site_key == site_key) else: raise Exception("Either dbid or site_key must be specified.") dmailaddr = q.first() if not dmailaddr: return None sess.expunge_all() return dmailaddr dmailaddr = yield from dispatcher.loop.run_in_executor(None, dbcall) return dmailaddr
def _load_default_dmail_address(dispatcher, fetch_keys=False): def dbcall(): with dispatcher.node.db.open_session() as sess: q = sess.query(NodeState)\ .filter(NodeState.key == consts.NSK_DEFAULT_ADDRESS) ns = q.first() if ns: q = sess.query(DmailAddress)\ .filter(DmailAddress.id == int(ns.value)) if fetch_keys: q = q.options(joinedload("keys")) addr = q.first() if addr: sess.expunge_all() return addr addr = sess.query(DmailAddress)\ .order_by(DmailAddress.id)\ .limit(1)\ .first() if addr: sess.expire_on_commit = False ns = NodeState() ns.key = consts.NSK_DEFAULT_ADDRESS ns.value = str(addr.id) sess.add(ns) sess.commit() sess.expunge_all() return addr addr = yield from dispatcher.loop.run_in_executor(None, dbcall) return addr
def _process_dmail_address(dispatcher, process_call, dbid=None, site_key=None,\ fetch_keys=False): def dbcall(): with dispatcher.node.db.open_session() as sess: q = sess.query(DmailAddress) if fetch_keys: q = q.options(joinedload("keys")) if dbid: q = q.filter(DmailAddress.id == dbid) elif site_key: q = q.filter(DmailAddress.site_key == site_key) else: raise Exception("Either dbid or site_key must be specified.") dmail_address = q.first() if process_call(sess, dmail_address): sess.expire_on_commit = False sess.commit() sess.expunge_all() return dmail_address dmail_address =\ yield from dispatcher.node.loop.run_in_executor(None, dbcall) return dmail_address