技术标签: 集成平台 爱投斯 联网平台 相关技术 接口协议 中台驱动 IOTOSystem
本文章为原创,转载请注明出处!
账号:iotos_test 密码:iotos123
代码地址:IOTOSDK-Python: IOTOS Python版本SDK,自带原生接口和采集引擎 (gitee.com)
目录
窄带物联网(Narrow Band Internet of Things, NB-IoT)成为万物互联网络的一个重要分支。NB-IoT构建于蜂窝网络,只消耗大约180kHz的带宽,可直接部署于GSM网络、UMTS网络或LTE网络,以降低部署成本、实现平滑升级。NB-IoT的设备可以连接至三大运营商的IoT平台,方便了设备的对接
将连接至电信平台的NB气体感应报警器的数据拿到并上云展示
品牌为JT-TCN,型号为5010的气体感应报警器 ,能将设备绑定在电信平台
#coding=utf-8
import sys
sys.path.append("..")
from driver import *
import time
import datetime
from urllib import urlencode
import urllib2
import base64
import hmac
import json
from hashlib import sha1
reload(sys)
sys.setdefaultencoding('utf8')
#签名算法
def signature(key, application, timestamp, param, body):
code = "application:" + application + "\n" + "timestamp:" + timestamp + "\n"
for v in param:
code += str(v[0]) + ":" + str(v[1]) + "\n"
if (body is not None) and (body.strip()) :
code += body + '\n'
return base64.b64encode(hash_hmac(key, code, sha1))
#数据加密
def hash_hmac(key, code, sha1):
hmac_code = hmac.new(key.encode(), code.encode(), sha1)
print("hmac_code=" + str(hmac_code.hexdigest()))
return hmac_code.digest()
#时间戳误差调整
def getTimeOffset(url):
request = urllib2.Request(url)
start = int(time.time() * 1000)
response = urllib2.urlopen(request)
end = int(time.time() * 1000)
if response is not None:
return int(int(response.headers['x-ag-timestamp']) - (end + start) / 2);
else:
return 0
baseUrl = 'https://ag-api.ctwing.cn'
timeUrl = 'https://ag-api.ctwing.cn/echo'
offset = getTimeOffset(timeUrl)
#发送http请求函数
def sendSDKRequest(path, head, param, body, version, application, MasterKey, key, method=None, isNeedSort=True,isNeedGetTimeOffset=False):
paramList = []
for key_value in param:
paramList.append([key_value, param[key_value]])
print("paramList=" + str(paramList))
if (MasterKey is not None) and (MasterKey.strip()):
paramList.append(['MasterKey', MasterKey])
if isNeedSort:
paramList = sorted(paramList)
headers = {}
if (MasterKey is not None) and (MasterKey.strip()):
headers['MasterKey'] = MasterKey
headers['application'] = application
headers['Date'] = str(datetime.datetime.now())
headers['version'] = version
temp = dict(param.items())
if (MasterKey is not None) and (MasterKey.strip()):
temp['MasterKey'] = MasterKey
url_params = urlencode(temp)
url = baseUrl + path
if (url_params is not None) and (url_params.strip()):
url = url + '?' + url_params
print("url=" + str(url))
global offset
if isNeedGetTimeOffset:
offset = getTimeOffset(timeUrl)
timestamp = str(int(time.time() * 1000) + offset)
headers['timestamp'] = timestamp
sign = signature(key, application, timestamp, paramList, body)
headers['signature'] = sign
headers.update(head)
print("headers : %s" % (str(headers)))
if (body is not None) and (body.strip()):
request = urllib2.Request(url=url, headers=headers, data=body.encode('utf-8'))
else:
request = urllib2.Request(url=url, headers=headers)
if (method is not None):
request.get_method = lambda: method
response = urllib2.urlopen(request)
if ('response' in vars()):
print("response.code: %d" % (response.code))
return response
else:
return None
#分页查询设备历史数据
def getDeviceStatusHisInPage(appKey, appSecret, body):
path = '/aep_device_status/getDeviceStatusHisInPage'
head = {}
param = {}
version = '20190928013337'
application = appKey
key = appSecret
response = sendSDKRequest(path, head, param, body, version, application, None, key, 'POST')
if response is not None:
return response.read()
return None
#查询事件上报
def QueryDeviceEventList(appKey, appSecret, MasterKey, body):
path = '/aep_device_event/device/events'
head = {}
param = {}
version = '20210327064751'
application = appKey
key = appSecret
response = sendSDKRequest(path, head, param, body, version, application, MasterKey, key, 'POST')
if response is not None:
return response.read()
return None
#数据下发函数
def CreateCommand(appKey, appSecret, MasterKey, body):
path = '/aep_device_command/command'
head = {}
param = {}
version = '20190712225145'
application = appKey
key = appSecret
response = sendSDKRequest(path, head, param, body, version, application, MasterKey, key, 'POST')
if response is not None:
return response.read()
return None
class Project(IOTOSDriverI):
def InitComm(self,attrs):
self.setPauseCollect(False)
self.setCollectingOneCircle(False)
self.online(True)
try:
#获取中台配置的参数(必不可少)
self.Nbapplication = self.sysAttrs['config']['param']['application'] # APPKEY
self.Nbkey = self.sysAttrs['config']['param']['key'] # APPScret
self.NbMasterKey=self.sysAttrs['config']['param']['MasterKey'] #MasterKey
# self.NbMasterKey = "5fef44837aa54f42a7a49b73a4d95a15"
self.NbproductId = self.sysAttrs['config']['param']['productId']
self.NbdeviceId = self.sysAttrs['config']['param']['deviceId']
except Exception,e:
self.debug(u'获取参数失败!'+e.message)
def Collecting(self, dataId):
try:
cfgtmp = self.data2attrs[dataId]['config']
# 过滤掉非采集点
if cfgtmp["param"] == "":
return ()
# 过滤采集点
if 'disabled' in cfgtmp and cfgtmp['disabled'] == True:
return ()
else:
self.debug(self.name(dataId))
#请求需要用的参数
timearry = (datetime.datetime.now() + datetime.timedelta(days=-29)).timetuple() # 当前时间减去29天后转化为timetuple的格式用于转换成timestamp格式
begin_timestamp = str(int(time.mktime(timearry) * 1000) + offset)
end_timestamp = str(int(time.time() * 1000) + offset)
page_size = "100"
page_timestamp = ""
# 上传设备上传数据
if 'private' in cfgtmp['param'] and cfgtmp['param']['private']=='show_data':
body_HisInPage = '{"productId":"' + self.NbproductId + '","deviceId":"' + self.NbdeviceId + '","begin_timestamp":"' + begin_timestamp + '","end_timestamp":"' + end_timestamp + '","page_size":' + page_size + ',"page_timestamp":"' + page_timestamp + '"}'
#http请求,拿去设备上报的数据
res = getDeviceStatusHisInPage(self.Nbapplication, self.Nbkey, body_HisInPage)
data = json.loads(res)['deviceStatusList']
self.debug(data)
data_dic = {}
flat = 0
for i in data:
if 'sample_ad_value' in i and flat != 1:
data_dic.update(i)
flat += 1
elif flat == 1:
break
self.debug(data_dic)
data_list = []
for key, value in data_dic.items():
# 拿到时间戳数据时转化为时间
if key == 'timestamp':
value = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(value) / 1000))
if key=='device_status':
status={
0:"正常",
1:"燃气泄漏报警",
2:"传感器故障",
3:"自检",
4:"失效"
}
value=status[value]
if type(value) == unicode:
# unicode转为str类型
value = value.encode('utf-8')
data_list.append(value)
print (data_list)
return tuple(data_list)
# 上传事件上报的数据
if 'private' in cfgtmp['param'] and cfgtmp['param']['private']=='gas_event':
#查询事件上报的数据
body_event = '{"productId":"' + self.NbproductId + '","deviceId":"' + self.NbdeviceId + '","startTime":"' + begin_timestamp + '","endTime":"' + end_timestamp + '","pageSize":' + page_size + ',"page_timestamp":"' + page_timestamp + '"}'
event_res = QueryDeviceEventList(self.Nbapplication, self.Nbkey, self.NbMasterKey, body_event)
self.debug(event_res)
event_data = json.loads(event_res)['result']['list']
# self.debug(event_data)
event_list = []
for i in event_data:
self.debug(i['eventContent'])
if 'gas_sensor_state' in i['eventContent']:
for key, value in eval(i['eventContent']).items():
if key=="gas_sensor_state":
status={
0:"正常",
1:"低浓度报警",
2:"高浓度"
}
value=status[value]
event_list.append(value)
event_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(i['createTime']) / 1000))
event_list.append(event_time)
self.debug(event_list)
break
return tuple(event_list)
return ()
except Exception,e:
self.debug(u'数据上传失败'+e.message)
def Event_setData(self, dataId, value):
return json.dumps({'code': 0, 'msg': '', 'data': ''})
#coding=utf-8
import sys
sys.path.append("..")
from driver import *
import time
import datetime
from urllib import urlencode
import urllib2
import base64
import hmac
import json
from hashlib import sha1
reload(sys)
sys.setdefaultencoding('utf8')
#签名算法
def signature(key, application, timestamp, param, body):
code = "application:" + application + "\n" + "timestamp:" + timestamp + "\n"
for v in param:
code += str(v[0]) + ":" + str(v[1]) + "\n"
if (body is not None) and (body.strip()) :
code += body + '\n'
return base64.b64encode(hash_hmac(key, code, sha1))
#数据加密
def hash_hmac(key, code, sha1):
hmac_code = hmac.new(key.encode(), code.encode(), sha1)
print("hmac_code=" + str(hmac_code.hexdigest()))
return hmac_code.digest()
#时间戳误差调整
def getTimeOffset(url):
request = urllib2.Request(url)
start = int(time.time() * 1000)
response = urllib2.urlopen(request)
end = int(time.time() * 1000)
if response is not None:
return int(int(response.headers['x-ag-timestamp']) - (end + start) / 2);
else:
return 0
baseUrl = 'https://ag-api.ctwing.cn'
timeUrl = 'https://ag-api.ctwing.cn/echo'
offset = getTimeOffset(timeUrl)
#发送http请求函数
def sendSDKRequest(path, head, param, body, version, application, MasterKey, key, method=None, isNeedSort=True,isNeedGetTimeOffset=False):
paramList = []
for key_value in param:
paramList.append([key_value, param[key_value]])
print("paramList=" + str(paramList))
if (MasterKey is not None) and (MasterKey.strip()):
paramList.append(['MasterKey', MasterKey])
if isNeedSort:
paramList = sorted(paramList)
headers = {}
if (MasterKey is not None) and (MasterKey.strip()):
headers['MasterKey'] = MasterKey
headers['application'] = application
headers['Date'] = str(datetime.datetime.now())
headers['version'] = version
temp = dict(param.items())
if (MasterKey is not None) and (MasterKey.strip()):
temp['MasterKey'] = MasterKey
url_params = urlencode(temp)
url = baseUrl + path
if (url_params is not None) and (url_params.strip()):
url = url + '?' + url_params
print("url=" + str(url))
global offset
if isNeedGetTimeOffset:
offset = getTimeOffset(timeUrl)
timestamp = str(int(time.time() * 1000) + offset)
headers['timestamp'] = timestamp
sign = signature(key, application, timestamp, paramList, body)
headers['signature'] = sign
headers.update(head)
print("headers : %s" % (str(headers)))
if (body is not None) and (body.strip()):
request = urllib2.Request(url=url, headers=headers, data=body.encode('utf-8'))
else:
request = urllib2.Request(url=url, headers=headers)
if (method is not None):
request.get_method = lambda: method
response = urllib2.urlopen(request)
if ('response' in vars()):
print("response.code: %d" % (response.code))
return response
else:
return None
#分页查询设备历史数据
def getDeviceStatusHisInPage(appKey, appSecret, body):
path = '/aep_device_status/getDeviceStatusHisInPage'
head = {}
param = {}
version = '20190928013337'
application = appKey
key = appSecret
response = sendSDKRequest(path, head, param, body, version, application, None, key, 'POST')
if response is not None:
return response.read()
return None
#查询事件上报
def QueryDeviceEventList(appKey, appSecret, MasterKey, body):
path = '/aep_device_event/device/events'
head = {}
param = {}
version = '20210327064751'
application = appKey
key = appSecret
response = sendSDKRequest(path, head, param, body, version, application, MasterKey, key, 'POST')
if response is not None:
return response.read()
return None
class Project(IOTOSDriverI):
def InitComm(self,attrs):
self.setPauseCollect(False)
self.setCollectingOneCircle(False)
self.online(True)
try:
#获取中台配置的参数(必不可少)
self.Nbapplication = self.sysAttrs['config']['param']['application'] # APPKEY
self.Nbkey = self.sysAttrs['config']['param']['key'] # APPScret
self.NbMasterKey=self.sysAttrs['config']['param']['MasterKey'] #MasterKey
# self.NbMasterKey = "5fef44837aa54f42a7a49b73a4d95a15"
self.NbproductId = self.sysAttrs['config']['param']['productId']
self.NbdeviceId = self.sysAttrs['config']['param']['deviceId']
except Exception,e:
self.debug(u'获取参数失败!'+e.message)
def Collecting(self, dataId):
try:
cfgtmp = self.data2attrs[dataId]['config']
# 过滤掉非采集点
if cfgtmp["param"] == "":
return ()
# 过滤采集点
if 'disabled' in cfgtmp and cfgtmp['disabled'] == True:
return ()
else:
self.debug(self.name(dataId))
#请求需要用的参数
timearry = (datetime.datetime.now() + datetime.timedelta(days=-29)).timetuple() # 当前时间减去29天后转化为timetuple的格式用于转换成timestamp格式
begin_timestamp = str(int(time.mktime(timearry) * 1000) + offset)
end_timestamp = str(int(time.time() * 1000) + offset)
page_size = "100"
page_timestamp = ""
# 上传设备上传数据
if 'private' in cfgtmp['param'] and cfgtmp['param']['private']=='show_data':
body_HisInPage = '{"productId":"' + self.NbproductId + '","deviceId":"' + self.NbdeviceId + '","begin_timestamp":"' + begin_timestamp + '","end_timestamp":"' + end_timestamp + '","page_size":' + page_size + ',"page_timestamp":"' + page_timestamp + '"}'
#http请求,拿去设备上报的数据
res = getDeviceStatusHisInPage(self.Nbapplication, self.Nbkey, body_HisInPage)
data = json.loads(res)['deviceStatusList']
self.debug(data)
data_dic = {}
flat = 0
for i in data:
if 'sample_ad_value' in i and flat != 1:
data_dic.update(i)
flat += 1
elif flat == 1:
break
self.debug(data_dic)
data_list = []
for key, value in data_dic.items():
# 拿到时间戳数据时转化为时间
if key == 'timestamp':
value = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(value) / 1000))
if key=='device_status':
status={
0:"正常",
1:"燃气泄漏报警",
2:"传感器故障",
3:"自检",
4:"失效"
}
value=status[value]
if type(value) == unicode:
# unicode转为str类型
value = value.encode('utf-8')
data_list.append(value)
print (data_list)
return tuple(data_list)
# 上传事件上报的数据
if 'private' in cfgtmp['param'] and cfgtmp['param']['private']=='gas_event':
#查询事件上报的数据
body_event = '{"productId":"' + self.NbproductId + '","deviceId":"' + self.NbdeviceId + '","startTime":"' + begin_timestamp + '","endTime":"' + end_timestamp + '","pageSize":' + page_size + ',"page_timestamp":"' + page_timestamp + '"}'
event_res = QueryDeviceEventList(self.Nbapplication, self.Nbkey, self.NbMasterKey, body_event)
self.debug(event_res)
event_data = json.loads(event_res)['result']['list']
# self.debug(event_data)
event_list = []
for i in event_data:
self.debug(i['eventContent'])
if 'gas_sensor_state' in i['eventContent']:
for key, value in eval(i['eventContent']).items():
if key=="gas_sensor_state":
status={
0:"正常",
1:"低浓度报警",
2:"高浓度"
}
value=status[value]
event_list.append(value)
event_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(i['createTime']) / 1000))
event_list.append(event_time)
self.debug(event_list)
break
return tuple(event_list)
return ()
except Exception,e:
self.debug(u'数据上传失败'+e.message)
def Event_setData(self, dataId, value):
return json.dumps({'code': 0, 'msg': '', 'data': ''})
文章浏览阅读598次。MACROBUTTON MTEditEquationSection2 SEQ MTEqn \r \h \* MERGEFORMAT SEQ MTSec \r 1 \h \* MERGEFORMAT SEQ MTChap \r 1 \h \* MERGEFORMAT 毕业设计 课程设计 毕业论文 详细资料 联系QQ号;1620812008本科毕业设计(论文)基于MATLAB的IIR滤波..._matlab设计iir滤波器为高通滤波器,截至频率为0.5π
文章浏览阅读6.5k次,点赞11次,收藏26次。一.准备工作Ⅰ.git下载和安装要连接GitHub,首先git是必不可少的,git的安装的基本使用很简单,这里并不是讲git使用的教程,所以只是提一下就略过了.Ⅱ.GitHub设置添加SSH Key这一步算是连接GitHub的最基本的一步了,git是分布式的代码管理工具,远程的代码管理是基于ssh的,所以得先配好SSH key.1.创建一个SSH Key打开终端,windows下面可能叫做git ..._git commit为什么需要邮箱和名字、
文章浏览阅读2.4k次,点赞8次,收藏9次。链接地址:https://www.cs.usfca.edu/~galles/visualization/BPlusTree.html数据结构可视化:https://www.cs.usfca.edu/~galles/visualization/Algorithms.html_在线体验b+tree
文章浏览阅读5.6k次。X<-c(1,2,3,NA,5) y<-c(236,90,56,NA,7) z<-c(54,6558,5,21,5) x1<-data.frame(X,y,z) r<-c(NA,1,2,NA,NA)ee> ee X y z r1 1 236 54 NA2 2 90 6558 13 3 56 5 24 NA NA 21 NA5 5_ggto05.net
文章浏览阅读377次。常用的判别分析方法是距离判别、贝叶斯判别和Fisher判别等。_判别分析的应用案例r语言
文章浏览阅读124次。1、依赖管理1、依赖配置依赖指当前项目运行所需的jar,一个项目可以设置多个依赖2、依赖传递将project03的坐标复制到project02的依赖中直接依赖:在当前项目中通过依赖配置建立的依赖关系间接依赖∶被资源的资源如果依赖其他资源,当前项目间接依赖其他资源路径优先:当依赖中出现相同的资源时,层级越深,优先级越低,层级越浅,优先级越高声明优先:当资源在相同层级被依赖时,配置顺序靠前的覆盖配置顺序靠后的特殊优先:当同级配置了相同资源的不同_项目依赖管理生命周期
文章浏览阅读2.5k次。首先对current_thread_info不熟悉的同学可以先百度一下这是个什么东西?我们这里仅仅简单的提一下,current_thread_info用于获取当前进程的信息。ARM32平台相关定义#define THREAD_SIZE_ORDER 1#define THREAD_SIZE (PAGE_SIZE << THREAD_SIZE_ORDER)union thr..._arm64 thread_info
文章浏览阅读121次。前言随着计算机硬件的不断发展,CPU进入多核时代,并发运算因此也上升到了并行运算的高度,作为Java程序员,如何最大化地“压榨”CPU的资源,利用CPU超高的计算速度编写高效快速运行的程序,如何解决CPU与RAM之间速度不匹配的问题呢?今天分享一份《Java高并发编程详解多线程与架构设计》就会帮你解决这些问题,本书结合作者的实践经验,不仅介绍了高速缓存Cache、Cache Line、Java内存模型,以及现代CPU基本架构等相关知识,还深入讲解了Java跨平台的高并发解决方案。本书实._阿里线程框架
文章浏览阅读1.8k次。我们网站使用的服务器环境一般有IIS、Apache、Tomcat、Nginx 等,他们各有优劣。一般虚拟空间使用IIS 和 Apache居多,这两个的配置都比较简单,IIS一般都安装了ISAPI_Rewrite 模块,可以直接使用和Apache一样的.htaccess文件来配置,直接将.htaccess文件上传到网站根目录即可。.htaccess 文件代码RewriteEngine OnRewri..._宝塔怎么添加htaccess规则
文章浏览阅读189次。在HbuilderX中打开前端项目,依次点击 运行 =》 运行到小程序模拟器 =》微信开发者工具。必要工具:HbuilderX,微信开发者工具。_博客小程序源码
文章浏览阅读1.1k次。1、组合索引能够避免回表查询:假设有一张订单表(orders),包含order_id和product_id二个字段。一共有31条数据。符合下面语句的数据有5条。执行下面的sql语句:select product_id from orderswhere order_id in (123, 312, 223, 132, 224);这条语句要mysql去根据order_id进行搜索,然后返回_假设有一张名为orders的订单表
文章浏览阅读7k次。前言在jenkins上展示html的报告,需要添加一个HTML Publisher plugin插件,把生成的html报告放到指定文件夹,这样就能用jenkins去读出指定文件夹的报告了。一、构建后操作1.前面执行完测试用例后,可以用“添加构建后操作步骤”,读出html报告文件2.如果你的展开后有Publish THML reports这_jenkins + python + publish html reports