Distributed Message Service (DMS)

DMS is a fully-managed, high-performance message queuing service that supports normal queues, first-in-first-out (FIFO) queues, Kafka queues, and Kafka premium instances. It is compatible with HTTP and TCP, and provides a flexible and reliable asynchronous communication mechanism for distributed applications. Normal and FIFO queues feature low-latency and high reliability. They support dead letter messages for handling exceptions. In normal queues, partitions ensure higher concurrency. Kafka queues support high-throughput and high-reliability modes. A Kafka queue is equivalent to a topic. The storage space and network bandwidth resources are allocated by the system, without requiring you to make choices. Kafka premium instances use physically isolated computing, storage, and bandwidth resources. You can customize partitions and replicas for Kafka topics in the instances, and configure the network bandwidth as required. The instances can be used right out of the box, taking off the deployment and O&M pressure for you so that you can focus on developing your services.

DMS Queues

A message queue is a container that receives and stores message files. By default, 5 queues can be created under a project. Different messages in one queue can be retrieved by multiple consumers at the same time.

List Queues

This interface is used to query all DMS Queues and to filter the output with query parameters.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

for raw in conn.dms.queues():
    print(raw)

Create Queue

This interface is used to create a Queue with parameters.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

attrs = {
    'name': 'test-queue',
    'queue_mode': 'NORMAL',
    'description': 'No need for a description',
}
for raw in conn.dms.create_queue(**attrs):
    print(raw)

Get Queue

This interface is used to get a Queue by ID or an instance of class Queue.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

queue = 'queue_id'
raw = conn.dms.get_queue(queue)
print(raw)

Find Queue

This interface is used to find a Queue by ID or name.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

queue = 'name_or_id'
raw = conn.dms.find_queue(name_or_id=queue)
print(raw)

Delete Queue

This interface is used to delete a Queue by ID or an instance of class Queue.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

queue_id = 'queue_id'
raw = conn.dms.delete_queue(queue_id, ignore_missing=True)
print(raw)

DMS Queue Groups

A consumer group is used to group consumers. A maximum of three consumer groups can be created in each queue. Messages in one queue can be retrieved once by each consumer group. Messages acknowledged by one consumer group are no longer available to that consumer group but still available to other consumer groups. Consumers in the same consumer group can retrieve different messages from one queue at the same time.

List Queue Groups

This interface is used to query all groups of a Queue and to filter the output with query parameters.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

queue_id = 'queue_id'
for raw in conn.dms.groups(queue_id):
    print(raw)

Create Queue Group

This interface is used to create a Queue Group with parameters.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

attrs = {
    'queue': 'queue_id',
    'name': 'test'
}
for raw in conn.dms.create_group(**attrs):
    print(raw)

Find Queue Group

This interface is used to find a Queue Group by ID or name.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

queue = 'name_or_id'
queue = conn.dms.find_queue(name_or_id=queue)
group = 'name_or_id'
raw = conn.dms.find_group(queue=queue, name_or_id=group)
print(raw)

Delete Queue Group

This interface is used to delete a Queue Group by ID or an instance of class Group.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')


cluster = "cluster_name_or_id"
cluster = conn.cce.find_cluster(cluster)
node_id = "node_id"
conn.cce.delete_cluster_node(cluster, node_id)

DMS messages

Messages are JavaScript object notation (JSON) objects used for transmitting information. They can be sent one by one or in batches. Sending messages in batches can be achieved only through calling DMS application programming interfaces (APIs).

Send message

This interface is used to send a message with parameters.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')


attrs = {
    'queue': 'queue_id',
    'messages': [
        {
            'body': 'test1',
            'attributes': {
                'attribute1': 'value1',
                'attribute2': 'value2'
            }
        },
        {
            'body': 'test2',
            'attributes': {
                'attribute1': 'value1',
                'attribute2': 'value2'
            }
        }
    ]
}
for raw in conn.dms.send_messages(**attrs):
    print(raw)

Consume message

This interface is used to consume a queue’s message by Queue- and Group-ID or an instance of class Queue and Group.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

attrs = {
    'queue': 'queue_id',
    'group': 'group_id'
}
for raw in conn.dms.consume_message(**attrs):
    print(raw)

Confirm message

This interface is used to confirm consumed messages by a list of class Messages.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

queue_name_or_id = 'queue_name_or_id'
queue = conn.dms.find_queue(name_or_id=queue_name_or_id)
group_name_or_id = 'group_name_or_id'
group = conn.dms.find_group(queue, name_or_id=group_name_or_id)


attrs = {
    'queue': queue,
    'group': group,
    'messages': [
        {
            'handler': 'handler_id',
            'status': 'success'
        }
    ]
}
for raw in conn.dms.ack_message(**attrs):
    print(raw)

DMS Instances

Kafka premium instances use physically isolated computing, storage, and bandwidth resources. You can customize partitions and replicas for Kafka topics in the instances, and configure the network bandwidth as required. The instances can be used right out of the box, taking off the deployment and O&M pressure for you so that you can focus on developing your services.

List Instances

This interface is used to query all instances with query parameters

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

for raw in conn.dms.instances():
    print(raw)

Find Instance

This interface is used to find an Instance by ID or name

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

instance = 'name_or_id'
raw = conn.dms.find_instance(name_or_id=instance)
print(raw)

Get Instance

This interface is used to get Instance by ID or an instance of class Instance.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')


job_id = "123456_job_id"
job = conn.cce.get_job(job_id)
print(job)

Create Instance

This interface is used to create an Instance with parameters.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

attrs = {
    'name': 'test-instance-2',
    'engine': 'kafka',
    'engine_version': '2.3.0',
    'storage_space': '600',
    'vpc_id': 'vpc_id',
    'security_group_id': 'sec_group_id',
    'subnet_id': 'network_id',
    'available_zones': ['az_id'],
    'product_id': 'product_id',
    'storage_spec_code': 'dms.physical.storage.ultra'
}
for raw in conn.dms.create_instance(**attrs):
    print(raw)

Delete Instance

This interface is used to delete an Instance by ID or an instance of class Instance.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

instance = 'name_or_id'
instance = conn.dms.find_instance(name_or_id=instance)

raw = conn.dms.delete_instance(instance)
print(raw)

Update Instance

This interface is used to update an Instance by ID or an instance of class Instance.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

attrs = {
    'name': 'test-instance-2',
    'engine': 'kafka',
    'engine_version': '2.3.0',
    'storage_space': '600',
    'vpc_id': 'vpc_id',
    'security_group_id': 'sec_group_id',
    'subnet_id': 'network_id',

Restart Instance

This interface is used to restart an Instance by ID or an instance of class Instance.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

instance = 'name_or_id'
instance = conn.dms.find_instance(name_or_id=instance)

raw = conn.dms.restart_instance(instance)
print(raw)

DMS Instance Topics

After creating a Kafka premium instance, you must create a topic in the instance for creating and retrieving messages.

List Instance Topics

This interface is used to query all instance topics with query parameters

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

instance = 'name_or_id'
instance = conn.dms.find_instance(name_or_id=instance)

for raw in conn.dms.topics(instance):
    print(raw)

Create Instance Topic

This interface is used to create an Instance topic with parameters.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

attrs = {
    'id': 'topic_name'  # Required
}
instance = 'name_or_id'
instance = conn.dms.find_instance(name_or_id=instance)

for raw in conn.dms.create_topic(instance, **attrs):
    print(raw)

Delete Instance Topic

This interface is used to delete an Instance topic by ID or an instance of class Instance.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

instance = 'name_or_id'
instance = conn.dms.find_instance(name_or_id=instance)

raw = conn.dms.delete_instance(instance)
print(raw)

Misc

Extra APIs allow querying of DMS specific data.

List Instance Availability Zones

This interface is used to query all instance AZ’s.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

for raw in conn.dms.availability_zones():
    print(raw)

List Products

This interface is used to query all supported DMS products.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

for raw in conn.dms.products():
    print(raw)

List Maintenance Windows

This interface is used to query all Maintenance Windows.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

for raw in conn.dms.maintenance_windows():
    print(raw)