Menu

Subscribe to real-time data to Kafka

The content described in this document belongs to the advanced usage functions of SensorsData Analysis, involves calling SensorsData OpenAPI, and is only for reference by users with relevant experience.

Overview

The SensorsData system adopts an open architecture design, which supports users to meet more usage scenarios by subscribing to real-time data. After receiving the data sent by the SDK, the server will process and store it in the database, and also support writing the data to the Kafka message queue for use by downstream customized computing modules.
This article describes how to subscribe real-time data to a Kafka message queue.

Subscription Requirements

Subscribing to real-time data requires the following:

  • Only the private deployment version supports real-time subscription of data through Kafka
  • An independent Kafka cluster needs to be prepared for receiving subscription data ( not a SensorsData Analysis Kafka cluster , but a self-built Kafka or cloud-based Kafka such as Alibaba Cloud, which does not currently support Kerberos authentication)
  • The machine where SensorsData Analytics is deployed must be able to access the Kafka cluster that receives data. You can verify connectivity by telnet {kafka_broker_hostname} 9092 on the machine where SensorsData Analytics is deployed.
  • It is recommended to use the same Kafka version as the SensorsData server (current version is 2.8.2)
  • SensorsData Analytics version requirements: Analysis Cloud 3.0.1+ package

Preparation

Prepare Kafka and Topic to receive data

Users need to prepare their own Kafka cluster to receive subscription data. SensorsData Analytics will push real-time data to this cluster. The following topics need to be created in the Kafka cluster in advance:

Data Types Topic Name Example illustrate

User attribute data

simple_attribute_data_topic

Used to receive user attribute data

User event data

event_data_topic

Used to receive user event data

User details

detail_data_topic

Used to receive user details

Prepare OpenAPI authentication information

To subscribe to real-time data, you need to call the SensorsData OpenAPI interface. Please refer to OpenAPI authentication method to obtain the API key and construct the request header (api-key, sensorsdata-project, etc.).

Registering a Subscriber

Before officially subscribing to data, you need to register the subscriber in the SensorsData system. The subscriber information includes:

  • application_name: unique identifier of the subscriber’s service, named by the user service itself
  • Kafka information for receiving data: the broker address and topic of the Kafka cluster prepared by the user in the preparation work

The SensorsData system will generate a unique identifier application_id for the subscriber based on application_name, and bind the application_id to the Kafka that receives the data. When you call the subscription data interface later, you only need to carry the application_id, and all data subscribed under this application_id will be sent to the bound Kafka.

Note: Users generally only need to register one subscriber. If the downstream computing business requires different data, you can subscribe all the required data to Kafka, and different businesses can use different group.ids to consume and filter by themselves.

Call OpenAPI to register a subscriber

  • Interface: http://{host}:8107/api/v3/horizon/v1/data-subscription/application/add
  • Request method: POST request
    • HEADER
      • api-key: #K-fMFGYYlot5H6dxxxxxxxxxxxxxxxxxxxxxx
      • sensorsdata-project:production
  • Request BODY example :

 {
  "application": {
    "application_name": "{application_name}",
    "application_type": "KAFKA_BASED_APP",
    "kafka_based_app_config": {
      "data_format_type": "NESTED_JSON",
      "profile_kafka_descriptor": {
        "type": "CUSTOMIZED_KAFKA",
        "customized_kafka_descriptor": {
          "bootstrap_servers": "{broker_list}",
          "topic_name": "simple_attribute_data_topic"
        }
      },
      "event_kafka_descriptor": {
        "type": "CUSTOMIZED_KAFKA",
        "customized_kafka_descriptor": {
          "bootstrap_servers": "{broker_list}",
          "topic_name": "event_data_topic"
        }
      },
      "detail_kafka_descriptor": {
        "type": "CUSTOMIZED_KAFKA",
        "customized_kafka_descriptor": {
          "bootstrap_servers": "{broker_list}",
          "topic_name": "detail_data_topic"
        }
      }
    }
  }
}

The complete curl command example for registering a subscriber is as follows:

 curl -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/application/add' \
-H 'api-key: {api-key}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'sensorsdata-project: {project_name}' \
-d '{
    "application":{
        "application_name": "{application_name}",
        "application_type": "KAFKA_BASED_APP",
        "kafka_based_app_config": {
            "data_format_type": "NESTED_JSON",             
            "profile_kafka_descriptor":{
                "type": "CUSTOMIZED_KAFKA",
                "customized_kafka_descriptor": {
                    "bootstrap_servers": "{broker_list}",
                    "topic_name": "simple_attribute_data_topic"
                }
            },
            "event_kafka_descriptor":{
                "type": "CUSTOMIZED_KAFKA",
                "customized_kafka_descriptor": {
                    "bootstrap_servers": "{broker_list}",
                    "topic_name": "event_data_topic"
                }
            },
            "detail_kafka_descriptor":{
                "type": "CUSTOMIZED_KAFKA",
                "customized_kafka_descriptor": {
                    "bootstrap_servers": "{broker_list}",
                    "topic_name": "detail_data_topic"
                }
            }
        }
    }
}'

The relevant parameters that need to be passed in the curl command are as follows:

Parameter name Parameter value examples illustrate
api-key #K-fMFGYYlot5H6dxxxxxxxxxxxxxxxxxxxxxx OpenAPI interface authentication information, refer to OpenAPI authentication method
application_name DataWorks The unique identifier of the subscriber's business, named by the user business itself
broker_list 10.1.111.110:9092,10.1.111.111:9092,10.1.111.112:9092 The Kafka where the user receives data corresponds to the value in broker_list. The cluster has multiple hostnames, separated by commas.
data_format_type NESTED_JSON

The data format that the subscriber expects to receive. Currently there are two supported data formats, the default data format is NESTED_JSON

  • NESTED_JSON: The received data is a nested type of Json. The information of each field is nested in a structure. Each field has a clear name, data_type and value. For example:

     .....
    {
        "name": "order_id",
        "data_type": "STRING",
        "value": "oid_1234565"
    }
    .....
  • FLATTEN_JSON: The received data format is flattened JSON, the field name is directly used as the key, and the value is the specific field content. For example:

     .....
    "order_id": "oid_1234565"
    .....
host 192.168.1.1 The domain name of the SensorsData analytics cluster or the IP address of the metadata node of the SensorsData analytics cluster
project_name production The English name of the project. The registered subscriber does not distinguish between projects. You can select any existing project name.
topic_name

simple_attribute_data_topic

event_data_topic

detail_data_topic

The name of the topic in Kafka that receives data. That is, the topic that has been created in advance in Kafka and Topic that is ready to receive data .

  • To subscribe to user attribute data, carry profile_kafka_descriptor in the request body, and topic_name = the topic name for receiving user attribute data.
  • If you need to subscribe to event data, carry event_kafka_descriptor in the request body, and topic_name = the topic name of the received event data.
  • If you need to subscribe to user attribute data, carry detail_kafka_descriptor in the request body, and topic_name = the topic name of the detailed data to be received.

Call SensorsData OpenAPI to register a subscriber. The returned information is as follows:

 {
  "code": "SUCCESS",
  "request_id": "031750310c5963b984c39480cc9138da",
  "data": {
    "application": {
      "application_name": "{application_name}",
      "application_id": "{application_id}"
      "application_type": "KAFKA_BASED_APP",
      "kafka_based_app_config": {
        "data_format_type": "NESTED_JSON",         
        "profile_kafka_descriptor": {
          "type": "CUSTOMIZED_KAFKA",
          "customized_kafka_descriptor": {
            "bootstrap_servers": "{broker_list}",
            "topic_name": "simple_attribute_data_topic"
          }
        },
        "event_kafka_descriptor": {
          "type": "CUSTOMIZED_KAFKA",
          "customized_kafka_descriptor": {
            "bootstrap_servers": "{broker_list}",
            "topic_name": "event_data_topic"
          }
        },
        "detail_kafka_descriptor": {
          "type": "CUSTOMIZED_KAFKA",
          "customized_kafka_descriptor": {
            "bootstrap_servers": "{broker_list}",
            "topic_name": "detail_data_topic"
          }
        }
      }
    }
  }
}

Need to pay attention to the returned information

  • code: indicates whether the call to SensorsData OpenAPI is successful. If code = SUCCESS, it means the subscription is successful.
  • application_id : unique identifier of the subscriber. The application_id needs to be carried when calling the subscription data interface later.

Check whether the registered subscriber is successful

You can call SensorsData OpenAPI to check whether the subscriber has been registered in the SensorsData system.

  • Interface: http://{host}:8107/api/v3/horizon/v1/data-subscription/application/get
  • Request method: POST request
    • HEADER
      • api-key: #K-fMFGYYlot5H6dxxxxxxxxxxxxxxxxxxxxxx
      • sensorsdata-project:production
    • Request BODY example :

      •  {
          "application_name": "{application_name}"
        }

The curl example for querying the registered subscriber results is as follows:

 curl -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/application/get \
-H 'api-key: {api-key}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'sensorsdata-project: {project_name}' \
-d '{"application_name": "{application_name}"}'

The interface return example is as follows:

 {
  "code": "SUCCESS",
  "request_id": "031750310c5963b984c39480cc9138da",
  "data": {
    "application": {
      "application_name": "{application_name}",
      "application_id": "{application_id}"
      "application_type": "KAFKA_BASED_APP",
      "kafka_based_app_config": {
        "profile_kafka_descriptor": {
          "type": "CUSTOMIZED_KAFKA",
          "customized_kafka_descriptor": {
            "bootstrap_servers": "{broker_list}",
            "topic_name": "simple_attribute_data_topic"
          }
        },
        "event_kafka_descriptor": {
          "type": "CUSTOMIZED_KAFKA",
          "customized_kafka_descriptor": {
            "bootstrap_servers": "{broker_list}",
            "topic_name": "event_data_topic"
          }
        },
        "detail_kafka_descriptor": {
          "type": "CUSTOMIZED_KAFKA",
          "customized_kafka_descriptor": {
            "bootstrap_servers": "{broker_list}",
            "topic_name": "detail_data_topic"
          }
        }
      }
    }
  }
}

Additional subscriber configuration

Initially, users may only need to subscribe to certain types of data, and only bind certain Kafka topics when registering a subscriber. If you want to subscribe to other types of data later, you can call OpenAPI to bind additional Kafka topics that receive other types of data.

  • Interface: http://{host}:8107/api/v3/horizon/v1/data-subscription/application/config/append
  • Request method: POST request
    • HEADER
      • api-key: #K-fMFGYYlot5H6dxxxxxxxxxxxxxxxxxxxxxx
      • sensorsdata-project:production
    • Request BODY example :

      •  {
            "application_name": "{application_name}",
            "kafka_based_app_config": {
                "profile_kafka_descriptor": {
                    "type": "CUSTOMIZED_KAFKA",
                    "customized_kafka_descriptor": {
                        "bootstrap_servers": "10.1.132.100:9092",
                        "topic_name": "simple_attribute_data_topic"
                    }
                }
            }
        }

The complete curl command example is as follows:

 curl -L -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/application/config/append' \
-H 'api-key: {api-key}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'sensorsdata-project: {project_name}' \
-d '{
    "application_name": "{application_name}",
    "kafka_based_app_config": {
        "profile_kafka_descriptor": {
            "type": "CUSTOMIZED_KAFKA",
            "customized_kafka_descriptor": {
                "bootstrap_servers": "{broker_list}",
                "topic_name": "horizon_subscription_public_profile_stream"
            }
        }
    }
}'

Subscription Data

Subscribe to the required real-time data from the SensorsData system through OpenAPI. After the subscription is successful, the newly reported data that meets the subscription scope will be pushed to the Kafka cluster where the user receives the data.

  • Interface address : http://{host}:8107/api/v3/horizon/v1/data-subscription/create
  • Request method : POST
    • HEADER
      • api-key: #K-fMFGYYlot5H6dxxxxxxxxxxxxxxxxxxxxxx
      • sensorsdata-project:production

The following are some of the parameters that need to be passed in when calling the subscription OpenAPI:

Parameter name Parameter value examples illustrate
application_id dataworks.company_a.app.sensorsdata.cloud Subscriber unique identifier
subscriber dataworks_a Subscriber business identifier. There can be multiple subscribers under the same application_id. Users can confirm which business party the subscription record belongs to through the subscriber.

Subscribe to user attribute data

Subscribe to specified user attributes

Example : Subscribe to three user attributes: name, gender, age 

 curl -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/create' \
-H 'sensorsdata-project: {project_name}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'api-key: {api-key}' \
-d '{
    "subscription": {
        "application": "{application_id}",
        "subscriber": "{subscriber}",
        "target_type": "ENTITY_DATA",
        "entity_data_subscription_config": {
            "entity_name": "user",
            "attribute_type": "SIMPLE",
            "simple_attribute_config": {
                "attribute_names": [
                    "name",
                    "gender",
                    "age"
                ]
            }
        }
    }
}'

Subscribe to all user properties

Use the wildcard * as the attribute name to subscribe to all common user attributes:

  • When using the wildcard * as the attribute name to be subscribed when making a request, it means subscribing to all common user attributes, excluding virtual attributes.
  • For newly created attributes, subscriptions are automatically replenished (there is a maximum delay of 2 minutes between attribute creation and subscription taking effect)

Example : Subscribe to all common user attributes

 curl -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/create' \
-H 'sensorsdata-project: {project_name}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'api-key: {api-key}' \
-d '{
    "subscription": {
        "application": "{application_id}",
        "subscriber": "{subscriber}",
        "target_type": "ENTITY_DATA",
        "entity_data_subscription_config": {
            "entity_name": "user",
            "attribute_type": "SIMPLE",
            "simple_attribute_config": {
                "attribute_names": [
                    "*"
                ]
            }
        }
    }
}'

Subscribing to event data

Subscribe to some attributes under the specified event

Example : Subscribe to the order_id  and pay_time  properties of a payOrder  event

# Subscribe to order_id and order_time of the payOrder event
curl -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/create' \
-H 'sensorsdata-project: {project_name}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'api-key: {api-key}' \
-d '{
  "subscription": {
    "application": "{application_id}",
    "subscriber": "{subscriber}",
    "target_type": "ENTITY_DATA",
    "entity_data_subscription_config": {
      "entity_name": "user",
      "attribute_type": "EVENTS",
      "events_attribute_config": {
        "event_name": "payOrder",
        "attribute_names": [
          "order_id",
          "pay_time"
        ]
      }
    }
  }
}'

Subscribe to all properties under the specified event

Use the wildcard * as the property name to subscribe to all normal properties of the event, excluding virtual properties

Example : Subscribe to all common properties of the payOrder event

 curl -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/create' \
-H 'sensorsdata-project: {project_name}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'api-key: {api-key}' \
-d '{
  "subscription": {
    "application": "{application_id}",
    "subscriber": "{subscriber}",
    "target_type": "ENTITY_DATA",
    "entity_data_subscription_config": {
      "entity_name": "user",
      "attribute_type": "EVENTS",
      "events_attribute_config": {
        "event_name": "payOrder",
        "attribute_names": [
          "*"
        ]
      }
    }
  }
}'

Subscribe to all properties under all events

Use the wildcard * as the event name to subscribe to all normal events, excluding virtual events

Example : Subscribe to all common properties of all common events

curl -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/create' \
-H 'sensorsdata-project: {project_name}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'api-key: {api-key}' \
-d '{
  "subscription": {
    "application": "{application_id}",
    "subscriber": "{subscriber}",
    "target_type": "ENTITY_DATA",
    "entity_data_subscription_config": {
      "entity_name": "user",
      "attribute_type": "EVENTS",
      "events_attribute_config": {
        "event_name": "*",
        "attribute_names": [
          "*"
        ]
      }
    }
  }
}'

Subscription details data

Some properties of the subscription details table

Example : Subscription details table cart product_id and price attributes

curl -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/create' \
-H 'sensorsdata-project: {project_name}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'api-key: {api-key}' \ 
--data '{
  "subscription": {
    "application": "{application_id}",
    "subscriber": "{subscriber}",
    "target_type": "ENTITY_DATA",
    "entity_data_subscription_config": {
      "entity_name": "user",
      "attribute_type": "DETAILS",
      "details_attribute_config": {
        "detail_name": "cart",
        "attribute_names": [
          "product_id",
          "price"
        ]
      }
    }
  }
}'

All properties of the subscription list

Use the wildcard * as the detail property name to subscribe to all common properties of the detail table, excluding virtual properties

Example : Subscribe to all common properties of cart

curl -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/create' \
-H 'sensorsdata-project: {project_name}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'api-key: {api-key}' \ 
--data '{
  "subscription": {
    "application": "{application_id}",
    "subscriber": "{subscriber}",
    "target_type": "ENTITY_DATA",
    "entity_data_subscription_config": {
      "entity_name": "user",
      "attribute_type": "DETAILS",
      "details_attribute_config": {
        "detail_name": "cart",
        "attribute_names": [
          "*"
        ]
      }
    }
  }
}'

Subscription interface return information

After calling the SensorsData subscription OpenAPI, the returned information is as follows:

{
    "code": "SUCCESS",
    "request_id": "5d3d3273cb2d44fd9abceefd2b5cdad5",
    "data": {
        "subscription_id": 16001,
        "subscriber": "{subscriber}",
        "application": "{application_id}",
        "project_id": 2,
        "target_type": "ENTITY_DATA",
        "entity_data_subscription_config": {
            "entity_name": "user",
            "attribute_type": "EVENTS",
            "events_attribute_config": {
                "event_name": "payOrder",
                "attribute_names": [
                    "order_id",
                    "pay_time"
                ]
            }
        },
        "access_info": {
            "creator_id": "",
            "modifier_id": "",
            "create_time": "2023-05-11T10:53:32.553Z",
            "update_time": "2023-05-11T10:53:32.553Z"
        }
    }
}

Need to pay attention to the returned information

  • code: indicates whether the OpenAPI request is successful. If code = SUCCESS, the subscription is successful.
  • subscription_id: subscription record id, uniquely identifies a successful subscription

Consume data and process it

After the data is subscribed successfully, for the newly reported data that meets the subscription scope, a copy of the data will be sent to the Kafka cluster that receives the data.

User business modules can consume data received by the Kafka cluster for customized processing.

Consumption data

Refer to the Kafka subscription example to implement data consumption

Data Format

When registering a subscriber and specifying the expected data format to be received as NESTED_JSON, the data received by the Kafka cluster is as follows:

  • User attribute data

 {
    "trace_id": "117274217502270410197",
    "organization_id": "org-sep-xxx",
    "project_id": 2,
    "project": "production",  
    "record": {
        "update_time": 1727421750034,
        "entity_record": {
            "operation_type": "UPSERT",
            "entity_name": "user",
            "attribute_type": "SIMPLE",
            "entity_id": {
                "name": "id",
                "data_type": "BIGINT",
                "value": "7356491433591617590"
            },
            "simple_data": {
                "data": [{
                    "name": "age",
                    "data_type": "NUMBER",
                    "value": "25.0"
                }, {
                    "name": "gender",
                    "data_type": "STRING",
                    "value": "男"
                }, {
                    "name": "name",
                    "data_type": "STRING",
                    "value": "张三"
                }]
            },
            "events_data": null,
            "details_data": null
        }
    }
}
  • Event Data
 {
    "trace_id": "21727421748010075027",
    "organization_id": "org-sep-xxx",
    "project_id": 2,
    "project": "production",  
    "record": {
        "update_time": 1727421747856,
        "entity_record": {
            "operation_type": "UPSERT",
            "entity_name": "user",
            "attribute_type": "EVENTS",
            "entity_id": {
                "name": "id",
                "data_type": "BIGINT",
                "value": "7356491433591617592"
            },
            "simple_data": null,
            "events_data": {
                "event_name": "payOrder",
                "data": [{
                    "name": "order_id",
                    "data_type": "STRING",
                    "value": "oid_1234565"
                }, {
                    "name": "pay_time",
                    "data_type": "DATETIME",
                    "value": "1720540800000"
                }],
            },
            "details_data": null
        }
    }
}
  • Detailed data
 {
    "trace_id": "51727421749008052549",
    "organization_id": "org-sep-12330",
    "project_id": 2,
    "project": "production",
    "record": {
        "update_time": 1727421747865,
        "entity_record": {
            "operation_type": "UPSERT",
            "entity_name": "user",
            "attribute_type": "DETAILS",
            "entity_id": {
                "name": "user_id",
                "data_type": "BIGINT",
                "value": "7356491433591617590"
            },
            "simple_data": null,
            "events_data": null,
            "details_data": {
                "detail_name": "cart",
                "data": [{
                    "name": "price",
                    "data_type": "NUMBER",
                    "value": "111.11"
                }, {
                    "name": "product_id",
                    "data_type": "STRING",
                    "value": "产品5"
                },{
                    "name": "id",
                    "data_type": "STRING",
                    "value": "subscription_cart_cac278d47e_1"
                }],
                "primary_keys": [{
                    "name": "id",
                    "data_type": "STRING",
                    "value": "subscription_cart_cac278d47e_1"
                }]
            }
        }
    }
}

When the expected data format is specified as FLATTEN_JSON when registering a subscriber, the data received by the Kafka cluster is as follows:

 {
  "trace_id": "21745403297474082219424",
  "organization_id": "org-sep-xxx",
  "project": "production",
  "project_id": 2,
  "entity_name": "user",
  "update_time": 1745403294934,
  "operation_type": "UPSERT",
  "entity_id": 8319627862933161993,
  "properties": {
    "price": 111.11,
    "order_id": "oid_1234565",
    "pay_time": "2023-11-16 09:22:13.000000"
  },
  "distinct_id": "device_3970185b-ff9e-413e-acb3-133c774740771745391152227.5833",
  "event": "OrderStatusChange",
  "time": 1745402074755
}

Field Description

In the data received by subscription, some fields are described as follows

Parameter name illustrate
trace_id Unique identifier of data, which can be used for deduplication judgment and problem troubleshooting
organization_id SensorsData system organization ID, which uniquely identifies a privately deployed SensorsData analysis cluster. Users do not need to pay attention to it.
project_id Project ID. This field can be used to determine the project that SensorsData corresponds to. For the relationship between project name and project ID, refer to Project Management Query.
entity_name Entity name, by default only user entity (user)
attribute_type

Data type, 3 categories in total

  • SIMPLE: Common user attributes
  • EVENTS: Events
  • DETAILS: Details
entity_id Entity id, equal to user_id in the events table or id in the users table, see: Identifying Users
simple_data

General user attribute data

events_data

Event attribute data, including the event name and a set of attributes

event_name Event Name
details_data

Detail attribute data, including detail name, a set of attributes, and primary key value

detail_name Detail name, that is, detail table name
primary_keys The primary key attribute name and value of the detail table, which uniquely identifies the data of a detail table
data Contains a set of attributes, each attribute has a name, data type and attribute value
name The attribute name is equal to the column name of a column in the events table or users table
value Attribute values are uniformly packaged using strings
data_type

Attribute data types are divided into the following data types

  • BOOL: Use Boolean.parseBoolean (value) to receive the property value.
  • INT: integer, use Long.parseLong (value) to receive the attribute value
  • NUMBER: integer or floating point number, use Double.parseDouble (value) to receive the attribute value
  • STRING: string
  • LIST: List, use Arrays.asList (value.split ("\n")) to receive attribute values
  • DATETIME: 13-digit millisecond timestamp, use Long.parseLong (value) to receive the attribute value
  • DATE: Date string, in the format of yyyy-MM-dd or yyyy-MM-dd HH:mm:ss, etc.
  • BIGINT: long integer, use Long.parseLong (value) to receive the attribute value
  • DECIMAL: Use new BigDecimal (value) to receive the attribute value

How to cancel your subscription

If the user does not want to receive certain subscribed data, he can cancel the subscription by calling SensorsData OpenAPI

Query subscribed data

Users can query currently subscribed data by calling OpenAPI and cancel subscription as needed.

  • Interface address: http://{host}:8107/api/v3/horizon/v1/data-subscription/list
  • Request method : GET
    • HEADER
      • api-key: #K-fMFGYYlot5H6dxxxxxxxxxxxxxxxxxxxxxx
      • sensorsdata-project: production
    • PARAM
      • application: {application_id}
      • subscriber: {subscriber}

The complete curl command to query subscribed data is as follows:

 curl -L -X GET 'http://{host}:8107/api/v3/horizon/v1/data-subscription/list?application={application_id}&subscriber={subscriber}' \
-H 'api-key: {api-key}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'sensorsdata-project: {project_name}'

An example of the information returned by the interface is as follows:

 {
  "code": "SUCCESS",
  "request_id": "031831579fc7617a600be3946389e432",
  "data": {
    "subscriptions": [
      {
        "project_id": 1,
        "subscription_id": 116,
        "application": "{application_id}",
        "subscriber": "{subscriber}",
        "target_type": "ENTITY_DATA",
        "entity_data_subscription_config": {
          "entity_name": "user",
          "attribute_type": "SIMPLE",
          "simple_attribute_config": {
            "attribute_names": [
              "phone_number",
              "quota",
              "register_time"
            ]
          }
        },
        "access_info": {
          "creator_id": "",
          "modifier_id": "",
          "create_time": "2025-04-02T09:06:10.510Z",
          "update_time": "2025-04-02T09:06:10.510Z"
        }
      }
    ]
  }
}

Pay attention to the following information in the returned information:

  • subscriber: subscriber service identifier
  • subscription_id: subscription record id, uniquely identifies a successful subscription

When canceling a subscription, you need to pass in application_id + subscriber + subscription_id. You can cancel the subscription only when the subscription record is matched.

Unsubscribe

Call OpenAPI to cancel the subscription based on the subscribed data found in Querying Subscribed Data . After the call is completed, you can query the subscribed data again to confirm whether the unsubscription is successful.

  • API address: http://{host}:8107/api/v3/horizon/v1/data-subscription/batch-delete
  • Request method : POST
    • HEADER
      • api-key: #K-fMFGYYlot5H6dxxxxxxxxxxxxxxxxxxxxxx
      • sensorsdata-project: production
  • The following is an example of request BODY :

  •  {
        "application": "{application_id}",
        "subscriber": "{subscriber}",
        "subscription_ids": [{subscription_id1}, {subscription_id2}]
    }

The complete curl command to unsubscribe is as follows:

 curl -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/batch-delete' \
-H 'api-key: {api-key}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'sensorsdata-project: {project_name}' \
-d '{
    "application":"{application_id}",
    "subscriber":"{subscriber}",
    "subscription_ids":[{subscription_id}]
}'

Some of the parameters that need to be passed in the curl command are described as follows:

Parameter name
Parameter value examples
illustrate
application_id dataworks.company_a.app.sensorsdata.cloud Subscriber unique identifier
subscriber dataworks_a Subscriber business identifier. There can be multiple subscribers under the same application_id. Users can confirm which business party the subscription record belongs to through the subscriber.
subscription_ids [1001, 1002] The id of the subscription record to unsubscribe from

Deleting a Subscriber

If the user fills in some incorrect information when registering a subscriber, or the user wants to change the Kafka Topic that receives the subscription data, you can call OpenAPI to delete the registered subscriber and re-register.

Please note that before deleting a subscriber, you must first unsubscribe from all the data subscribed to the subscriber, otherwise the deletion of the subscriber will fail.

  • Interface address: http://{host}:8107/api/v3/horizon/v1/data-subscription/application/delete
  • Request method : POST
    • HEADER
      • api-key: #K-fMFGYYlot5H6dxxxxxxxxxxxxxxxxxxxxxx
      • sensorsdata-project: production
  • The following is an example of request BODY :

  •  {
        "application_name": "{application_name}"
    }

The complete curl command to delete a subscriber is as follows:

 curl -X POST 'http://{host}:8107/api/v3/horizon/v1/data-subscription/application/delete' \
-H 'api-key: {api-key}' \
-H 'Content-Type: application/json;charset=UTF-8' \
-H 'sensorsdata-project: {project_name}' \
-d '{
    "application_name":"{application_name}"
}'

Frequently asked questions

Can I subscribe to historical data?

No, subscription is only effective for newly reported data. If users need historical data, they can refer to Using JDBC for Data Access to obtain historical data.

Possible reasons for not being able to subscribe to data

  • Network reasons : By default, SensorsData uses hostname to access the Kafka cluster that receives data and sends data. You need to configure the hostname and IP mapping relationship of the Kafka cluster that receives data in the hosts file of the SensorsData server. You can verify network connectivity by telnet {kafka_broker_hostname} 9092 on the SensorsData server.

  • Subscription failed : Call SensorsData OpenAPI to subscribe to data. The call is successful only when the interface returns "code": "SUCCESS"
  • Unreasonable registration of subscribers : For each registered subscriber, the SensorsData system needs to start a corresponding subscription task to send data, which will consume some SensorsData server resources. For the stand-alone SensorsData system, due to limited resources, the SensorsData default limit is to only send data for the first registered subscription task, and the remaining subscribers cannot receive data.
  • Data not stored in the database : Only stored data will be subscribed to and sent to Kafka. For example, if data is reported in debug mode without storage, the data will not be received in Kafka that receives the subscribed data.

Disclaimer

  • SensorsData will not be responsible for recovery due to data push delays caused by unstable Kafka clusters
  • SensorsData will not be responsible for supplementing data due to the accumulation or even expiration of SensorsData subscription data caused by the instability of the user's Kafka cluster.
  • SensorsData can assist in locating other data subscription related issues caused by the user environment or the user's failure to follow the documentation, but will not be held responsible.
Previous
Historical data import and export
Next
Advanced function
Last modified: 2025-07-01