本文整理汇总了Python中os.environ.get函数的典型用法代码示例。如果您正苦于以下问题:Python get函数的具体用法?Python get怎么用?Python get使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _send_slack_alert
def _send_slack_alert(self, message, service, color='green', sender='Cabot'):
channel = '#' + env.get('SLACK_ALERT_CHANNEL')
url = env.get('SLACK_WEBHOOK_URL')
icon_url = env.get('SLACK_ICON_URL')
# TODO: handle color
resp = requests.post(url, data=json.dumps({
'channel': channel,
'username': sender[:15],
'icon_url': icon_url,
'attachments': [{
'title': service.name,
'text': message,
'color': color,
'fields': [{
'title': 'status',
'value': service.overall_status,
'short': 'false'
}, {
'title': 'old status',
'value': service.old_overall_status,
'short': 'false'
}
]
}]
}))
开发者ID:tmeinlschmidt,项目名称:cabot-alert-slack,代码行数:27,代码来源:models.py
示例2: get_backend_api
def get_backend_api(test_case, cluster_id):
"""
Get an appropriate BackendAPI for the specified dataset backend.
Note this is a backdoor that is useful to be able to interact with cloud
APIs in tests. For many dataset backends this does not make sense, but it
provides a convenient means to interact with cloud backends such as EBS or
cinder.
:param test_case: The test case that is being run.
:param cluster_id: The unique cluster_id, used for backend APIs that
require this in order to be constructed.
"""
backend_config_filename = environ.get(
"FLOCKER_ACCEPTANCE_TEST_VOLUME_BACKEND_CONFIG")
if backend_config_filename is None:
raise SkipTest(
'This test requires the ability to construct an IBlockDeviceAPI '
'in order to verify construction. Please set '
'FLOCKER_ACCEPTANCE_TEST_VOLUME_BACKEND_CONFIG to a yaml filepath '
'with the dataset configuration.')
backend_name = environ.get("FLOCKER_ACCEPTANCE_VOLUME_BACKEND")
if backend_name is None:
raise SkipTest(
"Set acceptance testing volume backend using the " +
"FLOCKER_ACCEPTANCE_VOLUME_BACKEND environment variable.")
backend_config_filepath = FilePath(backend_config_filename)
full_backend_config = yaml.safe_load(
backend_config_filepath.getContent())
backend_config = full_backend_config.get(backend_name)
if 'backend' in backend_config:
backend_config.pop('backend')
backend = get_backend(backend_name)
return get_api(backend, pmap(backend_config), reactor, cluster_id)
开发者ID:james-w,项目名称:flocker,代码行数:35,代码来源:testtools.py
示例3: application
def application(config):
app = Application("Scrapyd")
http_port = int(environ.get('PORT', config.getint('http_port', 6800)))
config.cp.set('scrapyd', 'database_url', environ.get('DATABASE_URL'))
poller = Psycopg2QueuePoller(config)
eggstorage = FilesystemEggStorage(config)
scheduler = Psycopg2SpiderScheduler(config)
environment = Environment(config)
app.setComponent(IPoller, poller)
app.setComponent(IEggStorage, eggstorage)
app.setComponent(ISpiderScheduler, scheduler)
app.setComponent(IEnvironment, environment)
launcher = Launcher(config, app)
timer = TimerService(5, poller.poll)
webservice = TCPServer(http_port, server.Site(Root(config, app)))
log.msg("Scrapyd web console available at http://localhost:%s/ (HEROKU)"
% http_port)
launcher.setServiceParent(app)
timer.setServiceParent(app)
webservice.setServiceParent(app)
return app
开发者ID:kayawaffles,项目名称:scrapy-heroku,代码行数:26,代码来源:app.py
示例4: __init__
def __init__(self, api_key=environ.get('PIZ_GOOGLE_API_KEY'), cx=environ.get('PIZ_GOOGLE_SEARCH_CX')):
if api_key is None or cx is None:
raise UserMisconfigurationError('You must have both PIZ_GOOGLE_API_KEY and PIZ_GOOGLE_SEARCH_CX set as '
'environment variables in your shell.')
self.api_key = api_key
self.cx = cx
self.service = build('customsearch', 'v1', developerKey=api_key)
开发者ID:mmedal,项目名称:piz,代码行数:7,代码来源:search.py
示例5: test_returned_filesize
def test_returned_filesize():
runner = CliRunner()
result = runner.invoke(
cli,
['search',
environ.get('SENTINEL_USER'),
environ.get('SENTINEL_PASSWORD'),
'tests/map.geojson',
'--url', 'https://scihub.copernicus.eu/dhus/',
'-s', '20141205',
'-e', '20141208',
'-q', 'producttype=GRD']
)
expected = "1 scenes found with a total size of 0.50 GB"
assert result.output.split("\n")[-2] == expected
result = runner.invoke(
cli,
['search',
environ.get('SENTINEL_USER'),
environ.get('SENTINEL_PASSWORD'),
'tests/map.geojson',
'--url', 'https://scihub.copernicus.eu/dhus/',
'-s', '20140101',
'-e', '20141231',
'-q', 'producttype=GRD']
)
expected = "20 scenes found with a total size of 11.06 GB"
assert result.output.split("\n")[-2] == expected
开发者ID:hexgis,项目名称:sentinelsat,代码行数:30,代码来源:test_cli.py
示例6: send_alert
def send_alert(self, service, users, duty_officers):
account_sid = env.get('TWILIO_ACCOUNT_SID')
auth_token = env.get('TWILIO_AUTH_TOKEN')
outgoing_number = env.get('TWILIO_OUTGOING_NUMBER')
all_users = list(users) + list(duty_officers)
client = TwilioRestClient(
account_sid, auth_token)
mobiles = TwilioUserData.objects.filter(user__user__in=all_users)
mobiles = [m.prefixed_phone_number for m in mobiles if m.phone_number]
c = Context({
'service': service,
'host': settings.WWW_HTTP_HOST,
'scheme': settings.WWW_SCHEME,
})
message = Template(sms_template).render(c)
for mobile in mobiles:
try:
client.sms.messages.create(
to=mobile,
from_=outgoing_number,
body=message,
)
except Exception, e:
logger.exception('Error sending twilio sms: %s' % e)
开发者ID:dhobbs,项目名称:cabot-alert-irc,代码行数:27,代码来源:models.py
示例7: users_added_callback
def users_added_callback(self,users):
'''
Server url will be dynamic to work same code on different servers
in case on user enrollment and iOS profile generation also.
'''
loader = Loader("/opt/toppatch/mv/media/app/")
server_url = environ.get('SERVER_CNAME')
ses_conn = ses.connect_to_region('us-east-1',
aws_access_key_id=environ.get('AWS_SES_ACCESS_KEY_ID'),
aws_secret_access_key=environ.get(
'AWS_SES_SECRET_ACCESS_KEY'))
for user in users:
link = str(server_url) + '/enroll/'+str(user.get('enrollment_id'))
message = loader.load('user_enroll_mail.html').generate(
company_name=user.get('company_name'),
user_passwd=user.get('passcode'), activation_link=link)
# message = 'Your verification \
# link is : {0} and enrollment password is {1} . To ensure \
# your device os please open this link in your device \
# browser only. :)'.format(
# str(server_url) + '/enroll/'+str(user['enrollment_id']), user['passcode'])
#message = message.replace(' ', '')
try:
ses_conn.send_email('[email protected]',
'MDM Enrollment verification', message,
[user['email']], format='html')
except Exception,err:
print repr(err)
开发者ID:aman-gautam,项目名称:mv-server,代码行数:30,代码来源:batch_insert.py
示例8: agent_main
def agent_main(collector):
to_file(sys.stdout)
startLogging(sys.stdout)
return react(
run_agent, [
environ.get(
"FLOCKER_CONFIGURATION_PATH",
"/etc/flocker",
).decode("ascii"),
environ.get(
"CATALOG_FIREHOSE_PROTOCOL",
DEFAULT_FIREHOSE_PROTOCOL,
).decode("ascii"),
environ.get(
"CATALOG_FIREHOSE_HOSTNAME",
DEFAULT_FIREHOSE_HOSTNAME,
).decode("ascii"),
int(
environ.get(
"CATALOG_FIREHOSE_PORT",
unicode(DEFAULT_FIREHOSE_PORT).encode("ascii"),
).decode("ascii")
),
# Base64 encoded
environ["CATALOG_FIREHOSE_SECRET"].decode("ascii"),
collector,
],
)
开发者ID:ClusterHQ,项目名称:volume-hub-catalog-agents,代码行数:28,代码来源:agentlib.py
示例9: setUpClass
def setUpClass(cls):
cls.token = environ.get('KB_AUTH_TOKEN', None)
# WARNING: don't call any logging methods on the context object,
# it'll result in a NoneType error
cls.ctx = MethodContext(None)
cls.ctx.update({'token': cls.token,
'provenance': [
{'service': 'ReadsUtils',
'method': 'please_never_use_it_in_production',
'method_params': []
}],
'authenticated': 1})
config_file = environ.get('KB_DEPLOYMENT_CONFIG', None)
cls.cfg = {}
config = ConfigParser()
config.read(config_file)
for nameval in config.items('ReadsUtils'):
cls.cfg[nameval[0]] = nameval[1]
cls.shockURL = cls.cfg['shock-url']
cls.ws = Workspace(cls.cfg['workspace-url'], token=cls.token)
cls.impl = ReadsUtils(cls.cfg)
shutil.rmtree(cls.cfg['scratch'])
os.mkdir(cls.cfg['scratch'])
suffix = int(time.time() * 1000)
wsName = "test_ReadsUtils_" + str(suffix)
cls.ws_info = cls.ws.create_workspace({'workspace': wsName})
cls.dfu = DataFileUtil(os.environ['SDK_CALLBACK_URL'], token=cls.token)
开发者ID:msneddon,项目名称:ReadsUtils,代码行数:27,代码来源:ReadsUtils_server_test.py
示例10: load_env
def load_env(app):
if 'DATABASE_URI' in environ: app.config['DATABASE_URI'] = environ.get('DATABASE_URI')
if 'INSTA_ID' in environ: app.config['INSTA_ID'] = environ.get('INSTA_ID')
if 'INSTA_SECRET' in environ: app.config['INSTA_SECRET'] = environ.get('INSTA_SECRET')
if 'SECRET_KEY' in environ: app.config['SECRET_KEY'] = environ.get('SECRET_KEY')
开发者ID:gopito,项目名称:commonview,代码行数:7,代码来源:env_settings.py
示例11: run
def run():
"""
Run the server.
"""
# Set up the logger.
if not os.path.isdir(os.path.join(script_dir, 'logs')):
os.makedirs(os.path.join(script_dir, 'logs'))
# Format the logs.
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s")
# Enable the logs to split files at midnight.
handler = TimedRotatingFileHandler(
os.path.join(script_dir, 'logs', 'TorSpider.log'),
when='midnight', backupCount=7, interval=1)
handler.setLevel(app.config['LOG_LEVEL'])
handler.setFormatter(formatter)
log = logging.getLogger('werkzeug')
log.setLevel(app.config['LOG_LEVEL'])
log.addHandler(handler)
app.logger.addHandler(handler)
app.logger.setLevel(app.config['APP_LOG_LEVEL'])
# Set up the app server, port, and configuration.
port = int(environ.get('PORT', app.config['LISTEN_PORT']))
addr = environ.get('LISTEN_ADDR', app.config['LISTEN_ADDR'])
if app.config['USETLS']:
context = (app.config['CERT_FILE'], app.config['CERT_KEY_FILE'])
app.run(host=addr, port=port, threaded=True, ssl_context=context)
else:
app.run(host=addr, port=port, threaded=True)
开发者ID:TorSpider,项目名称:TorSpider-Frontend,代码行数:31,代码来源:frontend_manage.py
示例12: memcacheify
def memcacheify(timeout=500):
"""Return a fully configured Django ``CACHES`` setting. We do this by
analyzing all environment variables on Heorku, scanning for an available
memcache addon, and then building the settings dict properly.
If no memcache servers can be found, we'll revert to building a local
memory cache.
Returns a fully configured caches dict.
"""
caches = {}
if all((environ.get(e, "") for e in MEMCACHE_ENV_VARS)):
caches["default"] = CACHE_DEFAULTS
caches["default"].update({"LOCATION": "localhost:11211", "TIMEOUT": timeout})
elif all((environ.get(e, "") for e in MEMCACHIER_ENV_VARS)):
servers = environ.get("MEMCACHIER_SERVERS").replace(",", ";")
environ["MEMCACHE_SERVERS"] = servers
environ["MEMCACHE_USERNAME"] = environ.get("MEMCACHIER_USERNAME")
environ["MEMCACHE_PASSWORD"] = environ.get("MEMCACHIER_PASSWORD")
caches["default"] = CACHE_DEFAULTS
caches["default"].update({"LOCATION": servers, "TIMEOUT": timeout})
elif all((environ.get(e, "") for e in MEMCACHEDCLOUD_ENV_VARS)):
servers = environ.get("MEMCACHEDCLOUD_SERVERS").replace(",", ";")
environ["MEMCACHE_SERVERS"] = servers
environ["MEMCACHE_USERNAME"] = environ.get("MEMCACHEDCLOUD_USERNAME")
environ["MEMCACHE_PASSWORD"] = environ.get("MEMCACHEDCLOUD_PASSWORD")
caches["default"] = CACHE_DEFAULTS
caches["default"].update({"LOCATION": servers, "TIMEOUT": timeout})
elif environ.get("MEMCACHEIFY_USE_LOCAL", False):
caches["default"] = {"BACKEND": "django_pylibmc.memcached.PyLibMCCache"}
else:
caches["default"] = {"BACKEND": "django.core.cache.backends.locmem.LocMemCache"}
return caches
开发者ID:edmorley,项目名称:django-heroku-memcacheify,代码行数:35,代码来源:memcacheify.py
示例13: setUpClass
def setUpClass(cls):
token = environ.get('KB_AUTH_TOKEN', None)
cls.ctx = {'token': token, 'provenance': [{'service': 'data_api2',
'method': 'please_never_use_it_in_production', 'method_params': []}],
'authenticated': 1}
config_file = environ.get('KB_DEPLOYMENT_CONFIG', None)
cls.cfg = {}
config = ConfigParser()
config.read(config_file)
for nameval in config.items('data_api2'):
cls.cfg[nameval[0]] = nameval[1]
cls.wsURL = cls.cfg['workspace-url']
cls.wsClient = workspaceService(cls.wsURL, token=token)
cls.serviceImpl = data_api2(cls.cfg)
cls.obj_name="ReferenceGenomeAnnotations/kb|g.207118"
cls.obj_name="ReferenceGenomeAnnotations/kb|g.217864"
cls.feature='kb|g.207118.CDS.3237'
cls.feature='kb|g.217864.CDS.11485'
cls.gene='kb|g.217864.locus.10619'
cls.obj_name="ReferenceGenomeAnnotations/kb|g.140057"
cls.feature='kb|g.140057.CDS.2901'
cls.gene='kb|g.140057.locus.2922'
cls.mrna='kb|g.140057.mRNA.2840'
cls.taxon= u'1779/523209/1'
cls.assembly='1837/56/1'
开发者ID:scanon,项目名称:data_api2,代码行数:27,代码来源:data_api2_server_test.py
示例14: run
def run(self, args):
self.check_not_docsearch_app_id('run a config manually')
self.exec_shell_command(["docker", "stop", "documentation-scrapper-dev"])
self.exec_shell_command(["docker", "rm", "documentation-scrapper-dev"])
f = open(args[0], 'r')
config = f.read()
run_command = [
'docker',
'run',
'-e',
'APPLICATION_ID=' + environ.get('APPLICATION_ID'),
'-e',
'API_KEY=' + environ.get('API_KEY'),
'-e',
"CONFIG=" + config,
'-v',
getcwd() + '/scraper/src:/root/src',
'--name',
'documentation-scrapper-dev',
'-t',
'algolia/documentation-scrapper-dev',
'/root/run'
]
return self.exec_shell_command(run_command)
开发者ID:algolia,项目名称:docsearch-scraper,代码行数:28,代码来源:run_config_docker.py
示例15: get_runtime_paths
def get_runtime_paths(self, what):
"""
Determine what components of LD_LIBRARY_PATH are necessary to run `what`
"""
from os import environ
from os.path import dirname
self.env.stash()
self.env.LD_LIBRARY_PATH = environ.get("LD_LIBRARY_PATH", "")
try:
# Run ldd
out, err = self.bld.cmd_and_log([self.env.LDD, what],
output=waflib.Context.BOTH)
finally:
self.env.revert()
# Parse ldd output to determine what paths are used by the dynamic linker
maybe_paths = set()
for line in out.split("\n"):
parts = line.split()
if not parts: continue
if parts[1] == "=>": maybe_paths.add(dirname(parts[2]))
return maybe_paths & set(environ.get("LD_LIBRARY_PATH", "").split(":"))
开发者ID:pwaller,项目名称:diphoton-analysis,代码行数:25,代码来源:compiler_magic.py
示例16: main
def main(styles_list):
from os import environ
##### Temp ####
environ['IMGDIR'] = '/mnt/images'
## ************* ##
imgnum = int(environ.get('IMGNUM', 0))
root_dir = environ.get('IMGDIR', '/mnt/Post_Complete/MozuRoot')
ext = environ.get('IMGEXT','.png')
## Do for only 1 img number or load any that are found
if imgnum > 0:
flist = create_list_files_to_send(styles_list, imgnum=imgnum, ext=ext, root_dir=root_dir)
else:
flist = []
for x in range(1,7,1):
li1 = create_list_files_to_send(styles_list, imgnum=x, ext=ext, root_dir=root_dir)
[ flist.append(f) for f in li1 if f is not None ]
####
## Compile Actual Styles and Filename found and Ready to Send
loaded_filenames = [f.split('/')[-1].split('.')[0] for f in flist if f is not None]
loaded_styles = list(set(sorted([fn[:9] for fn in loaded_filenames if fn is not None])))
print 'loaded styles', loaded_styles
### Send Collected to Mozu
import mozu_exec
print('Starting.\nReloading {0} Images for {1} Styles from {2} to Mozu'.format(ext.lstrip('.').upper(), len(styles_list), root_dir))
mozu_exec.main(flist)
print('Finished.\nReloaded {0} Images for {1} Styles\n{2} Total Files from {3} to Mozu'.format(ext.lstrip('.').upper(), len(loaded_styles), len(loaded_filenames),root_dir))
## Set Media Version for Loaded Styles
import media_version_ctrl
media_version_ctrl.batch_process_by_style_list(loaded_styles)
print('Finished.\nMedia Incr for {1} Styles from {2} to Mozu'.format(ext.lstrip('.').upper(), len(loaded_styles), root_dir))
## Return File Path of Loaded Styles for further ops if needed
return loaded_filenames
开发者ID:relic7,项目名称:prodimages,代码行数:33,代码来源:reload_mozu_from_l3.py
示例17: determine_base_flags
def determine_base_flags():
flags = {
'libraries': [],
'include_dirs': [],
'extra_link_args': [],
'extra_compile_args': []}
if c_options['use_ios']:
sysroot = environ.get('IOSSDKROOT', environ.get('SDKROOT'))
if not sysroot:
raise Exception('IOSSDKROOT is not set')
flags['include_dirs'] += [sysroot]
flags['extra_compile_args'] += ['-isysroot', sysroot]
flags['extra_link_args'] += ['-isysroot', sysroot]
elif platform == 'darwin':
v = os.uname()
if v[2] == '13.0.0':
sysroot = ('/Applications/Xcode5-DP.app/Contents/Developer'
'/Platforms/MacOSX.platform/Developer/SDKs'
'/MacOSX10.8.sdk/System/Library/Frameworks')
else:
sysroot = ('/System/Library/Frameworks/'
'ApplicationServices.framework/Frameworks')
flags['extra_compile_args'] += ['-F%s' % sysroot]
flags['extra_link_args'] += ['-F%s' % sysroot]
return flags
开发者ID:Glebtor,项目名称:kivy,代码行数:25,代码来源:setup.py
示例18: fcgi
def fcgi():
"""<comment-ja>
外部Web Server(FastCGI) 起動処理
</comment-ja>
<comment-en>
TODO: English Comment
</comment-en>
"""
# WebServer(fcgi)
conf = ''
if env.get('KARESANSUI_CONF'): # envrion
conf = env.get('KARESANSUI_CONF')
else: #error
print >>sys.stderr, '[fcgi] Please specify the configuration file. - Please set the environment variable that "KARESANSUI_CONF".'
sys.exit(1)
config = None
if conf: # read file
_k2v = K2V(conf)
config = _k2v.read()
try:
import flup
except ImportError, e:
print >>sys.stderr, '[Error] There are not enough libraries.(fcgi) - %s' % ''.join(e.args)
traceback.format_exc()
sys.exit(1)
开发者ID:goura,项目名称:karesansui,代码行数:27,代码来源:prep.py
示例19: elb_connect
def elb_connect(region=None, *args, **kwargs):
"""Helper to connect to Amazon Web Services EC2, using identify provided
by environment, as also optional region in arguments.
"""
if not os_environ.get("AWS_ACCESS_KEY_ID", None):
raise EC2LibraryError("Environment variable AWS_ACCESS_KEY_ID is not set.")
if not os_environ.get("AWS_SECRET_ACCESS_KEY", None):
raise EC2LibraryError("Environment variable AWS_SECRET_ACCESS_KEY is not set.")
if not region:
region = env.get("ec2_region")
for reg in boto.ec2.elb.regions():
if reg.name == region:
region = reg
# region = get_region(region,
# aws_access_key_id = os_environ.get("AWS_ACCESS_KEY_ID"),
# aws_secret_access_key = os_environ.get("AWS_ACCESS_SECRET_KEY")
# )
#
connection = ELBConnection(
os_environ.get("AWS_ACCESS_KEY_ID"),
os_environ.get("AWS_SECRET_ACCESS_KEY"),
region=region,
*args,
**kwargs
)
return connection
开发者ID:ajdiaz,项目名称:mico,代码行数:29,代码来源:elb.py
示例20: get_libpaths
def get_libpaths():
"""
On AIX, the buildtime searchpath is stored in the executable.
as "loader header information".
The command /usr/bin/dump -H extracts this info.
Prefix searched libraries with LD_LIBRARY_PATH (preferred),
or LIBPATH if defined. These paths are appended to the paths
to libraries the python executable is linked with.
This mimics AIX dlopen() behavior.
"""
libpaths = environ.get("LD_LIBRARY_PATH")
if libpaths is None:
libpaths = environ.get("LIBPATH")
if libpaths is None:
libpaths = []
else:
libpaths = libpaths.split(":")
objects = get_ld_headers(executable)
for (_, lines) in objects:
for line in lines:
# the second (optional) argument is PATH if it includes a /
path = line.split()[1]
if "/" in path:
libpaths.extend(path.split(":"))
return libpaths
开发者ID:actionless,项目名称:cpython,代码行数:25,代码来源:_aix.py
注:本文中的os.environ.get函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论