本文整理汇总了Python中marklogic.connection.Connection类的典型用法代码示例。如果您正苦于以下问题:Python Connection类的具体用法?Python Connection怎么用?Python Connection使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Connection类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: instance_init
def instance_init(cls,host):
"""
Performs first-time initialization of a newly installed server.
:param host: The name or IP address of the host to initialize
"""
conn = Connection(host, None)
uri = "{0}://{1}:8001/admin/v1/init".format(conn.protocol,conn.host)
logger = logging.getLogger("marklogic")
logger.debug("Initializing {0}".format(host))
# This call is a little odd; we special case the 400 error that
# occurs if the host has alreadya been initialized.
try:
response = conn.post(uri,
content_type='application/x-www-form-urlencoded')
except UnexpectedManagementAPIResponse:
response = conn.response
if response.status_code == 400:
err = json.loads(response.text)
if "errorResponse" in err:
if "messageCode" in err["errorResponse"]:
if err["errorResponse"]["messageCode"] == "MANAGE-ALREADYINIT":
return Host(host)
raise
if response.status_code != 202:
raise UnexpectedManagementAPIResponse(response.text)
return Host(host)._set_just_initialized()
开发者ID:ndw,项目名称:python_api_TEMP,代码行数:32,代码来源:__init__.py
示例2: instance_admin
def instance_admin(cls,host,realm,admin,password):
"""
Initializes the security database of a newly initialized server.
:param host: The name or IP address of the host to initialize
:param realm: The security realm to install
:param admin: The name of the admin user
:param password: The password of the admin user
"""
conn = Connection(host, None)
payload = {
'admin-username': admin,
'admin-password': password,
'realm': realm
}
uri = "{0}://{1}:8001/admin/v1/instance-admin".format(
conn.protocol, conn.host)
logger = logging.getLogger("marklogic")
logger.debug("Initializing security for {0}".format(host))
# N.B. Can't use conn.post here because we don't need auth yet
response = requests.post(uri, json=payload,
headers={'content-type': 'application/json',
'accept': 'application/json'})
if response.status_code != 202:
raise UnexpectedManagementAPIResponse(response.text)
# From now on connections require auth...
conn = Connection(host, HTTPDigestAuth(admin, password))
data = json.loads(response.text)
conn.wait_for_restart(data["restart"]["last-startup"][0]["value"])
开发者ID:ndw,项目名称:python_api_TEMP,代码行数:35,代码来源:__init__.py
示例3: _get_server_config
def _get_server_config(self):
"""
Obtain the server configuration. This is the data necessary for
the first part of the handshake necessary to join a host to a
cluster. The returned data is not intended for introspection.
:return: The config. This is always XML.
"""
connection = Connection(self.host_name(), None)
uri = "http://{0}:8001/admin/v1/server-config".format(connection.host)
response = connection.get(uri, accept="application/xml")
if response.status_code != 200:
raise UnexpectedManagementAPIResponse(response.text)
return response.text # this is always XML
开发者ID:JamFuller,项目名称:python_api,代码行数:15,代码来源:host.py
示例4: test_create
def test_create(self):
pem = ("-----BEGIN CERTIFICATE-----\n"
"MIIC3TCCAkYCCQCJtpKDQbobyTANBgkqhkiG9w0BAQsFADCBsjELMAkGA1UEBhMC\n"
"VVMxCzAJBgNVBAgMAlRYMQ8wDQYDVQQHDAZBdXN0aW4xHjAcBgNVBAoMFU1hcmtM\n"
"b2dpYyBDb3Jwb3JhdGlvbjEXMBUGA1UECwwOVFggRW5naW5lZXJpbmcxITAfBgNV\n"
"BAMMGE1hcmtMb2dpYyBUWCBFbmdpbmVlcmluZzEpMCcGCSqGSIb3DQEJARYabm9y\n"
"bWFuLndhbHNoQG1hcmtsb2dpYy5jb20wHhcNMTQwODI3MTkyMzQyWhcNMTUwODI3\n"
"MTkyMzQyWjCBsjELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAlRYMQ8wDQYDVQQHDAZB\n"
"dXN0aW4xHjAcBgNVBAoMFU1hcmtMb2dpYyBDb3Jwb3JhdGlvbjEXMBUGA1UECwwO\n"
"VFggRW5naW5lZXJpbmcxITAfBgNVBAMMGE1hcmtMb2dpYyBUWCBFbmdpbmVlcmlu\n"
"ZzEpMCcGCSqGSIb3DQEJARYabm9ybWFuLndhbHNoQG1hcmtsb2dpYy5jb20wgZ8w\n"
"DQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAJSo3wFMDvTV7Q+4NDDMu9aJZ6uK4l8b\n"
"ACIk5/Ug+MoST+CuIfeBlb2Y6dxNCwkADwVPpykslDcHYFygxFIcnHHVhgqZ0xzu\n"
"LPXBagXmHyj+mb6im1tkbqAxQ7gj/SDeCnQYRIwNRlGgWZJFViaYJH3CC8G/f16F\n"
"IhDyQS3h28W3AgMBAAEwDQYJKoZIhvcNAQELBQADgYEAWbidV4huPlf8Ac0c3Cbs\n"
"Nx2xogODSjNPKqwug0Y3jKx33uxeY7i9oParWSnVFkG0JYUZEfrO5fmtS6JSA1Lk\n"
"e3BioC9xgclEYFiDoZSARasL8hdNvu7v+EYZEnS43rR4M7CQiq/Tf50o4VjiVM9S\n"
"I0Bo+VZSaShQKipBEHS8sP8=\n"
"-----END CERTIFICATE-----\n")
connection = Connection.make_connection(tc.hostname, tc.admin, tc.password)
cert = Authority.create(connection, pem)
self.assertIsNotNone(cert)
self.assertEqual('true', cert.enabled())
self.assertIsNotNone(cert.properties())
cert.delete(connection)
开发者ID:ndw,项目名称:python_api_TEMP,代码行数:29,代码来源:test_authorities.py
示例5: test_load
def test_load(self):
connection = Connection.make_connection(tc.hostname, tc.admin, tc.password)
server = HttpServer("Manage", "Default")
self.assertEqual(server.server_name(), "Manage")
self.assertIsNotNone(server.read(connection))
self.assertEqual("http", server.server_type())
开发者ID:Annakan,项目名称:python_api,代码行数:7,代码来源:test_server.py
示例6: test_lookup
def test_lookup(self):
connection = Connection.make_connection(tc.hostname, tc.admin, tc.password)
server = Server.lookup(connection, "Manage")
self.assertIsNotNone(server)
self.assertEqual(server.server_name(), "Manage")
开发者ID:Annakan,项目名称:python_api,代码行数:7,代码来源:test_server.py
示例7: test_list
def test_list(self):
connection = Connection.make_connection(tc.hostname, tc.admin, tc.password)
names = Role.list(connection)
self.assertGreater(len(names), 65)
self.assertIn("admin", names)
开发者ID:ndw,项目名称:python_api_TEMP,代码行数:7,代码来源:test_role.py
示例8: test_lookup
def test_lookup(self):
connection = Connection.make_connection(tc.hostname, tc.admin, tc.password)
role = Role.lookup(connection, "admin")
self.assertIsNotNone(role)
self.assertEqual(role.role_name(), "admin")
开发者ID:ndw,项目名称:python_api_TEMP,代码行数:7,代码来源:test_role.py
示例9: test_template
def test_template(self):
connection = Connection.make_connection(tc.hostname, tc.admin, tc.password)
req = Request(countryName="US", stateOrProvinceName="TX",
localityName="Austin", organizationName="MarkLogic",
emailAddress="[email protected]",
version=0)
temp = Template("Test Template", "Test description", req)
self.assertEqual("Test Template", temp.template_name())
temp.create(connection)
names = Template.list(connection)
self.assertGreater(len(names), 0)
self.assertIn(temp.template_id(), names)
temp.set_template_name("New Name")
temp.set_template_description("New Description")
temp.update(connection)
self.assertIsNotNone(temp)
newtemp = Template.lookup(connection, temp.template_id())
self.assertEqual(temp.template_name(), newtemp.template_name())
temp.delete(connection)
self.assertIsNotNone(temp)
开发者ID:Annakan,项目名称:python_api,代码行数:29,代码来源:test_cert_templates.py
示例10: test_list
def test_list(self):
connection = Connection.make_connection(tc.hostname, tc.admin, tc.password)
privileges = Privilege.list(connection)
self.assertGreater(len(privileges), 300)
self.assertIn("execute|manage-admin", privileges)
开发者ID:ndw,项目名称:python_api_TEMP,代码行数:7,代码来源:test_privilege.py
示例11: test_lookup
def test_lookup(self):
connection = Connection.make_connection(tc.hostname, tc.admin, tc.password)
privilege = Privilege.lookup(connection, "manage-admin", "execute")
self.assertIsNotNone(privilege)
self.assertEqual(privilege.privilege_name(), "manage-admin")
开发者ID:ndw,项目名称:python_api_TEMP,代码行数:7,代码来源:test_privilege.py
示例12: test_lookup
def test_lookup(self):
connection = Connection.make_connection(tc.hostname, tc.admin, tc.password)
user = User.lookup(connection, "nobody")
self.assertIsNotNone(user)
self.assertEqual(user.user_name(), "nobody")
开发者ID:ndw,项目名称:python_api_TEMP,代码行数:7,代码来源:test_user.py
示例13: test_list
def test_list(self):
connection = Connection.make_connection(tc.hostname, tc.admin, tc.password)
users = User.list(connection)
self.assertGreater(len(users), 2)
self.assertIn("nobody", users)
开发者ID:ndw,项目名称:python_api_TEMP,代码行数:7,代码来源:test_user.py
示例14: test_lookup_action
def test_lookup_action(self):
connection = Connection.make_connection(tc.hostname, tc.admin, tc.password)
privilege = Privilege.lookup(connection, kind="execute", \
action="http://marklogic.com/xdmp/privileges/admin-module-write")
self.assertIsNotNone(privilege)
self.assertEqual(privilege.privilege_name(), "admin-module-write")
开发者ID:ndw,项目名称:python_api_TEMP,代码行数:8,代码来源:test_privilege.py
示例15: test_lookup
def test_lookup(self):
connection = Connection.make_connection(tc.hostname, tc.admin, tc.password)
names = Authority.list(connection)
auth = Authority.lookup(connection, names[0])
self.assertIsNotNone(auth)
self.assertEqual(auth.certificate_id(), names[0])
开发者ID:ndw,项目名称:python_api_TEMP,代码行数:8,代码来源:test_authorities.py
示例16: _test_ssl_certificate_pems
def _test_ssl_certificate_pems(self):
connection = Connection.make_connection(tc.hostname, tc.admin, tc.password)
server = HttpServer("foo-http", "Default", 10101, '/', 'Documents')
self.assertEqual(server.server_name(), "foo-http")
server.create(connection)
self.assertIsNotNone(server)
self.assertEqual("http", server.server_type())
pem1 = "-----BEGIN CERTIFICATE-----\n\
MIICgjCCAeugAwIBAgIBBDANBgkqhkiG9w0BAQQFADBTMQswCQYDVQQGEwJVUzEc\n\
MBoGA1UEChMTRXF1aWZheCBTZWN1cmUgSW5jLjEmMCQGA1UEAxMdRXF1aWZheCBT\n\
ZWN1cmUgZUJ1c2luZXNzIENBLTEwHhcNOTkwNjIxMDQwMDAwWhcNMjAwNjIxMDQw\n\
MDAwWjBTMQswCQYDVQQGEwJVUzEcMBoGA1UEChMTRXF1aWZheCBTZWN1cmUgSW5j\n\
LjEmMCQGA1UEAxMdRXF1aWZheCBTZWN1cmUgZUJ1c2luZXNzIENBLTEwgZ8wDQYJ\n\
KoZIhvcNAQEBBQADgY0AMIGJAoGBAM4vGbwXt3fek6lfWg0XTzQaDJj0ItlZ1MRo\n\
RvC0NcWFAyDGr0WlIVFFQesWWDYyb+JQYmT5/VGcqiTZ9J2DKocKIdMSODRsjQBu\n\
WqDZQu4aIZX5UkxVWsUPOE9G+m34LjXWHXzr4vCwdYDIqROsvojvOm6rXyo4YgKw\n\
Env+j6YDAgMBAAGjZjBkMBEGCWCGSAGG+EIBAQQEAwIABzAPBgNVHRMBAf8EBTAD\n\
AQH/MB8GA1UdIwQYMBaAFEp4MlIR21kWNl7fwRQ2QGpHfEyhMB0GA1UdDgQWBBRK\n\
eDJSEdtZFjZe38EUNkBqR3xMoTANBgkqhkiG9w0BAQQFAAOBgQB1W6ibAxHm6VZM\n\
zfmpTMANmvPMZWnmJXbMWbfWVMMdzZmsGd20hdXgPfxiIKeES1hl8eL5lSE/9dR+\n\
WB5Hh1Q+WKG1tfgq73HnvMP2sUlG4tega+VWeponmHxGYhTnyfxuAxJ5gDgdSIKN\n\
/Bf+KpYrtWKmpj29f5JZzVoqgrI3eQ==\n\
-----END CERTIFICATE-----"
pem2 = "-----BEGIN CERTIFICATE-----\n\
MIICkDCCAfmgAwIBAgIBATANBgkqhkiG9w0BAQQFADBaMQswCQYDVQQGEwJVUzEc\n\
MBoGA1UEChMTRXF1aWZheCBTZWN1cmUgSW5jLjEtMCsGA1UEAxMkRXF1aWZheCBT\n\
ZWN1cmUgR2xvYmFsIGVCdXNpbmVzcyBDQS0xMB4XDTk5MDYyMTA0MDAwMFoXDTIw\n\
MDYyMTA0MDAwMFowWjELMAkGA1UEBhMCVVMxHDAaBgNVBAoTE0VxdWlmYXggU2Vj\n\
dXJlIEluYy4xLTArBgNVBAMTJEVxdWlmYXggU2VjdXJlIEdsb2JhbCBlQnVzaW5l\n\
c3MgQ0EtMTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAuucXkAJlsTRVPEnC\n\
UdXfp9E3j9HngXNBUmCbnaEXJnitx7HoJpQytd4zjTov2/KaelpzmKNc6fuKcxtc\n\
58O/gGzNqfTWK8D3+ZmqY6KxRwIP1ORROhI8bIpaVIRw28HFkM9yRcuoWcDNM50/\n\
o5brhTMhHD4ePmBudpxnhcXIw2ECAwEAAaNmMGQwEQYJYIZIAYb4QgEBBAQDAgAH\n\
MA8GA1UdEwEB/wQFMAMBAf8wHwYDVR0jBBgwFoAUvqigdHJQa0S3ySPY+6j/s1dr\n\
aGwwHQYDVR0OBBYEFL6ooHRyUGtEt8kj2Puo/7NXa2hsMA0GCSqGSIb3DQEBBAUA\n\
A4GBADDiAVGqx+pf2rnQZQ8w1j7aDRRJbpGTJxQx78T3LUX47Me/okENI7SS+RkA\n\
Z70Br83gcfxaz2TE4JaY0KNA4gGK7ycH8WUBikQtBmV1UsCGECAhX2xrD2yuCRyv\n\
8qIYNMR1pHMc8Y3c7635s3a0kr/clRAevsvIO1qEYBlWlKlV\n\
-----END CERTIFICATE-----"
server.add_ssl_client_certificate_pem(pem1)
server.update(connection)
server.read(connection)
self.assertEqual(1, len(server.ssl_client_certificate_pems()))
server.set_ssl_client_certificate_pems([pem1,pem2])
server.update(connection)
server.read(connection)
self.assertEqual(2, len(server.ssl_client_certificate_pems()))
server.delete(connection)
server = Server.lookup(connection, "foo-http")
self.assertIsNone(server)
开发者ID:Annakan,项目名称:python_api,代码行数:57,代码来源:test_server.py
示例17: test_create_webdav_server
def test_create_webdav_server(self):
connection = Connection.make_connection(tc.hostname, tc.admin, tc.password)
server = WebDAVServer("foo-webdav", "Default", 10101, '/', 'Documents')
self.assertEqual(server.server_name(), "foo-webdav")
server.create(connection)
self.assertIsNotNone(server)
self.assertEqual("webdav", server.server_type())
server.delete(connection)
server = Server.lookup(connection, "foo-webdav")
self.assertIsNone(server)
开发者ID:Annakan,项目名称:python_api,代码行数:10,代码来源:test_server.py
示例18: test_list
def test_list(self):
connection = Connection.make_connection(tc.hostname, tc.admin, tc.password)
names = Authority.list(connection, include_names=True)
self.assertGreater(len(names), 100)
found = False
for name in names:
found = found or "Equifax" in name
self.assertEqual(True, found)
开发者ID:ndw,项目名称:python_api_TEMP,代码行数:10,代码来源:test_authorities.py
示例19: test_save_privilege
def test_save_privilege(self):
connection = Connection.make_connection(tc.hostname, tc.admin, tc.password)
privilege = Privilege("foo-privilege","http://example.com/","execute")
privilege.create(connection)
privilege.add_role_name("manage-user")
privilege.update(connection)
self.assertIn("manage-user", privilege.role_names())
privilege.delete(connection)
开发者ID:ndw,项目名称:python_api_TEMP,代码行数:11,代码来源:test_privilege.py
示例20: test_create_privilege
def test_create_privilege(self):
connection = Connection.make_connection(tc.hostname, tc.admin, tc.password)
new_privilege = Privilege("foo-privilege","http://example.com/","execute")
self.assertEqual(new_privilege.privilege_name(), "foo-privilege")
new_privilege.create(connection)
privileges = Privilege.list(connection)
self.assertIn("execute|foo-privilege", privileges)
new_privilege.delete(connection)
开发者ID:ndw,项目名称:python_api_TEMP,代码行数:12,代码来源:test_privilege.py
注:本文中的marklogic.connection.Connection类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论