来源:
声明:如果我侵犯了任何人的权利,请联系我,我会删除
欢迎高手来喷我
文章目录
完整项目下载 https://github.com/gqj123/spyne-hbase
Python利用spyne 实现webservice服务
前提所有的网络和端口都可以同信:ping的同和telnet的通
spyne的原理啥的请移步https://www.cnblogs.com/guanfuchang/p/5985070.html,这个哥们的博客会有些介绍,我这里只讲我的应用
Client端发送请求之前,Server端需要开启并运行Spyne服务,Client再开启Spyne服务,双方建立TCP连接,然后进行通信
服务端代码server.py: 实现RangeQuery范围查询和PointQuery点查询
在linux服务器中运行这段代码 python server.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# 远程调用需要的包
from spyne import Application,rpc,ServiceBase,Iterable,Integer,Unicode
from spyne.protocol.soap import Soap11,Soap12
from spyne.server.wsgi import WsgiApplication
# HBase需要的包
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from hbase import Hbase
from hbase.ttypes import *
import happybase
import re
import sys
import time
import os
import json
class RangeQuery(ServiceBase):
@rpc(Unicode,Unicode, Unicode, Unicode, Unicode, Unicode, Unicode, _returns=Unicode)
def Range_Query(self,tableName,start_t,stop_t,lat_start,lon_start,lat_stop,lon_stop):
# 这里的9090是HBase的提供服务的端口
transport = TSocket.TSocket('192.168.1.181', 9090)
transport = TTransport.TBufferedTransport(transport)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = Hbase.Client(protocol)
transport.open()
list_dict = []
if start_t=='':
start_t='0'
if stop_t=='':
stop_t='2030'
startKey = start_t + '_' + lat_start + '_' + lon_start
stopKey = stop_t + '_' + lat_stop + '_' + lon_stop
#startKey = str(start_t).ljust(10, '0') + '_' + str("%3.2f" % float(lat_start)).rjust(6, '0') + '_' + str("%3.2f" % float(lon_start)).rjust(6, '0')
#stopKey = str(stop_t).ljust(10, '0') + '_' + str("%3.2f" % float(lat_stop)).rjust(6, '0') + '_' + str("%3.2f" % float(lon_stop)).rjust(6, '0')
scanner = client.scannerOpenWithStop(tableName, startKey, stopKey, [])
while True:
result = client.scannerGet(scanner) # 根据ScannerID来获取结果
if not result:
break
for dir in result:
dict = {
}
dict["time"] = dir.columns.get('info:time').value
dict["lat"] = dir.columns.get('info:lat').value
dict["lon"] = dir.columns.get('info:lon').value
if(tableName=="WL"):
dict["w_level"] = dir.columns.get('info:w_level').value
if(tableName=="AT"):
dict["a_temperature"] = dir.columns.get('info:a_temperature').value
if(tableName=="SLP"):
dict["pressure"] = dir.columns.get('info:pressure').value
if(tableName=="SST"):
dict["w_temperature"] = dir.columns.get('info:w_temperature').value
if(tableName=="WIND"):
dict["direction"] = dir.columns.get('info:direction').value
dict["speed"] = dir.columns.get('info:speed').value
if(tableName=="PSAL"):
dict["deep"] = dir.columns.get('info:deep').value
dict["deep_qc"] = dir.columns.get('info:deep_qc').value
dict["salinity"] = dir.columns.get('info:salinity').value
dict["salinity_qc"] = dir.columns.get('info:salinity_qc').value
if(tableName=="TEMP"):
dict["deep"] = dir.columns.get('info:deep').value
dict["deep_qc"] = dir.columns.get('info:deep_qc').value
dict["temperature"] = dir.columns.get('info:temperature').value
dict["temperature_qc"] = dir.columns.get('info:temperature_qc').value
if(tableName=="STATION_TEMP"):
dict["deep"] = dir.columns.get('info:deep').value
dict["temperature"] = dir.columns.get('info:temperature').value
if(tableName=="STATION_PSAL"):
dict["deep"] = dir.columns.get('info:deep').value
dict["salinity"] = dir.columns.get('info:salinity').value
if(tableName=="STATION_CUR"):
dict["depth"] = dir.columns.get('info:depth').value
dict["direction"] = dir.columns.get('info:direction').value
dict["speed"] = dir.columns.get('info:speed').value
list_dict.append(dict)
responsejson = json.dumps(list_dict)
return responsejson
class PointQuery(ServiceBase):
@rpc(Unicode, Unicode, _returns=Unicode)
def Point_Query(self, tableName, rowkey):
connection = happybase.Connection('192.168.1.181')
connection.open()
table = connection.table(tableName)
d = table.row(rowkey)
dict = {
}
for k, v in d.items():
if (k.decode("utf8") == 'info:time'):
dict["time"] = v.decode("utf8")
if (k.decode("utf8") == 'info:lat'):
dict["lat"] = v.decode("utf8")
if (k.decode("utf8") == 'info:lon'):
dict["lon"] = v.decode("utf8")
if (tableName == "WL"):
for k, v in d.items():
if (k.decode("utf8") == 'info:w_level'):
dict["w_level"] = v.decode("utf8")
if (tableName == "AT"):
for k, v in d.items():
if (k.decode("utf8") == 'info:a_temperature'):
dict["a_temperature"] = v.decode("utf8")
if (tableName == "SLP"):
for k, v in d.items():
if (k.decode("utf8") == 'info:pressure'):
dict["pressure"] = v.decode("utf8")
if (tableName == "SST"):
for k, v in d.items():
if (k.decode("utf8") == 'info:w_temperature'):
dict["w_temperature"] = v.decode("utf8")
if (tableName == "WIND"):
for k, v in d.items():
if (k.decode("utf8") == 'info:direction'):
dict["direction"] = v.decode("utf8")
for k, v in d.items():
if (k.decode("utf8") == 'info:speed'):
dict["speed"] = v.decode("utf8")
if (tableName == "PSAL"):
for k, v in d.items():
if (k.decode("utf8") == 'info:deep'):
dict["deep"] = v.decode("utf8")
for k, v in d.items():
if (k.decode("utf8") == 'info:deep_qc'):
dict["deep_qc"] = v.decode("utf8")
for k, v in d.items():
if (k.decode("utf8") == 'info:salinity'):
dict["salinity"] = v.decode("utf8")
for k, v in d.items():
if (k.decode("utf8") == 'info:salinity_qc'):
dict["salinity_qc"] = v.decode("utf8")
if (tableName == "TEMP"):
for k, v in d.items():
if (k.decode("utf8") == 'info:deep'):
dict["deep"] = v.decode("utf8")
for k, v in d.items():
if (k.decode("utf8") == 'info:deep_qc'):
dict["deep_qc"] = v.decode("utf8")
for k, v in d.items():
if (k.decode("utf8") == 'info:temperature'):
dict["temperature"] = v.decode("utf8")
for k, v in d.items():
if (k.decode("utf8") == 'info:temperature_qc'):
dict["temperature_qc"] = v.decode("utf8")
if (tableName == "STATION_TEMP"):
for k, v in d.items():
if (k.decode("utf8") == 'info:deep'):
dict["deep"] = v.decode("utf8")
for k, v in d.items():
if (k.decode("utf8") == 'info:temperature'):
dict["temperature"] = v.decode("utf8")
if (tableName == "STATION_PSAL"):
for k, v in d.items():
if (k.decode("utf8") == 'info:deep'):
dict["deep"] = v.decode("utf8")
for k, v in d.items():
if (k.decode("utf8") == 'info:salinity'):
dict["salinity"] = v.decode("utf8")
if (tableName == "STATION_CUR"):
for k, v in d.items():
if (k.decode("utf8") == 'info:depth'):
dict["depth"] = v.decode("utf8")
for k, v in d.items():
if (k.decode("utf8") == 'info:direction'):
dict["direction"] = v.decode("utf8")
for k, v in d.items():
if (k.decode("utf8") == 'info:speed'):
dict["speed"] = v.decode("utf8")
responsejson = json.dumps(dict)
return responsejson
application = Application([RangeQuery],'http://schemas.xmlsoap.org/soap/envelope',in_protocol=Soap11(validator='lxml'),out_protocol=Soap11())
wsgi_application = WsgiApplication(application)
if __name__ == '__main__':
import logging
from wsgiref.simple_server import make_server
logging.basicConfig(level=logging.ERROR)
logging.getLogger('spyne.protocol.xml').setLevel(logging.ERROR)
logging.info("listening to http://192.168.1.181:8888")
# 运行之后可以在浏览器端打开http://192.168.1.181:8888/?wsdl,有显示xml文件就证明成功了
logging.info("wsdl is at: http://192.168.1.181:8888/?wsdl")
# 这里的端口可以随意指定空闲可用的端口,一定要可以远程调用的端口!!!!我用的是8888
server = make_server('192.168.1.181',8888,wsgi_application)
server.serve_forever()
我windows中查看ip:8888的地址,出现xml文件证明启动成功
客户端代码client.py 运行在本地的编辑器中,我用的idea
#改进点查询测试
from suds.client import Client
import json
client = Client('http://221.239.0.181:8888/?wsdl')
res = client.service.Point_Query('WL','1861071407_138.38_175.50')
dict = json.loads(res)
print(dict["time"])
#范围查询test
from suds.client import Client
import json
from time import *
# start_t=input("start:")
# lat_start=input("lat_start:")
# lon_start=input("lon_start:")
#
# stop_t=input("stop:")
# lat_stop=input("lat_stop:")
# lon_stop=input("lon_stop:")
tableName = 'WL'
start_t = 2006060000
lat_start = 0
lon_start = 0
stop_t = 2006062000
lat_stop = 0
lon_stop = 0
start_time = time()
client = Client('http://221.239.0.181:8888/?wsdl')
client.set_options(timeout=3600)
res = client.service.Range_Query(tableName,start_t,stop_t,lat_start,lon_start,lat_stop,lon_stop)
dict = json.loads(res)
print(dict)
end_time = time()
print(len(dict))
print('该程序的运行时间是:' , end_time-start_time)