加载文件
数据结构
0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
---|---|---|---|---|---|---|---|
AAPL | 28-01-2011 | 344.17 | 344.4 | 333.53 | 336.1 | 21144800 | |
date | open | lowest | heightest | close |
import numpy as np
import datetime as dt
import matplotlib.pyplot as mp
import matplotlib.dates as md
def dmy2ymd(dmy):
dmy = str(dmy,encoding='utf8')
time = dt.datetime.strptime(dmy, "%d-%m-%Y")
return time.strftime("%Y-%m-%d")
dates, opening_prices, highest_prices, lowest_prices, closing_prices\
= np.loadtxt('../da_data/aapl.csv', delimiter=',',
usecols=(1, 3, 4, 5, 6),unpack=True,
dtype='M8[D], f8, f8, f8, f8',
converters={1: dmy2ymd}
)
mp.grid(linestyle=":")
ax = mp.gca()
ax.xaxis.set_major_locator(md.WeekdayLocator(byweekday=md.MO))
ax.xaxis.set_major_formatter(md.DateFormatter('%d %b %Y'))
ax.xaxis.set_minor_locator(md.DayLocator())
dates = dates.astype(md.datetime.datetime)
mp.plot(dates, closing_prices, color='dodgerblue', linestyle=":")
mp.gcf().autofmt_xdate()
mp.show()
K线
import numpy as np
import datetime as dt
import matplotlib.pyplot as mp
import matplotlib.dates as md
def dmy2ymd(dmy):
dmy = str(dmy,encoding='utf8')
time = dt.datetime.strptime(dmy, "%d-%m-%Y")
return time.strftime("%Y-%m-%d")
dates, opening_prices, highest_prices, lowest_prices, closing_prices\
= np.loadtxt('../da_data/aapl.csv', delimiter=',',
usecols=(1, 3, 4, 5, 6),unpack=True,
dtype='M8[D], f8, f8, f8, f8',
converters={1: dmy2ymd}
)
mp.figure("candle lines", facecolor='lightgray')
mp.grid(linestyle=":")
ax = mp.gca()
ax.xaxis.set_major_locator(md.WeekdayLocator(byweekday=md.MO))
ax.xaxis.set_major_formatter(md.DateFormatter('%d %b %Y'))
ax.xaxis.set_minor_locator(md.DayLocator())
dates = dates.astype(md.datetime.datetime)
colors = np.zeros_like(dates, dtype='U10')
colors[closing_prices > opening_prices] = 'white'
colors[closing_prices < opening_prices] = 'limegreen'
ecolors = np.zeros_like(colors)
ecolors[closing_prices > opening_prices] = 'red'
ecolors[closing_prices < opening_prices] = 'limegreen'
mp.vlines(dates, highest_prices, lowest_prices, color=ecolors)
mp.bar(dates, closing_prices - opening_prices, 0.8, opening_prices, color=colors, edgecolor=ecolors, zorder=3)
mp.gcf().autofmt_xdate()
mp.show()
均值
import numpy as np
import datetime as dt
import matplotlib.pyplot as mp
import matplotlib.dates as md
def dmy2ymd(dmy):
dmy = str(dmy,encoding='utf8')
time = dt.datetime.strptime(dmy, "%d-%m-%Y")
return time.strftime("%Y-%m-%d")
dates, opening_prices, highest_prices, lowest_prices, closing_prices\
= np.loadtxt('../da_data/aapl.csv', delimiter=',',
usecols=(1, 3, 4, 5, 6),unpack=True,
dtype='M8[D], f8, f8, f8, f8',
converters={1: dmy2ymd}
)
mean = np.mean(closing_prices)
mp.grid(linestyle=":")
ax = mp.gca()
ax.xaxis.set_major_locator(md.WeekdayLocator(byweekday=md.MO))
ax.xaxis.set_major_formatter(md.DateFormatter('%d %b %Y'))
ax.xaxis.set_minor_locator(md.DayLocator())
dates = dates.astype(md.datetime.datetime)
mp.plot(dates, closing_prices, color='dodgerblue', linestyle=":", label='ClossingPrices')
mp.hlines(mean, dates[0], dates[-1], linewidth=2, color='limegreen', label='Mean')
mp.gcf().autofmt_xdate()
mp.legend()
mp.show()
加权平均
# 权重 w = [w1, w2, w3, ... , wn]
# 样本 s = [s1, s2, s3, ... , sn]
# m = (s1xw1 + s2xw2 + ... + snwn) / (w1 + w2 + ... + wn)
# np.average(s,w)
最值
np.max() np.min mp.ptp: 最大、最小、极差
np.argmax() mp.argmin 最大、小值下标
np.maximum np.minimum() 从两个数组中获取最大小值
import numpy as np
import datetime as dt
import matplotlib.pyplot as mp
import matplotlib.dates as md
def dmy2ymd(dmy):
dmy = str(dmy,encoding='utf8')
time = dt.datetime.strptime(dmy, "%d-%m-%Y")
return time.strftime("%Y-%m-%d")
dates, opening_prices, highest_prices, lowest_prices, closing_prices\
= np.loadtxt('../da_data/aapl.csv', delimiter=',',
usecols=(1, 3, 4, 5, 6),unpack=True,
dtype='M8[D], f8, f8, f8, f8',
converters={1: dmy2ymd}
)
min_val = np.min(lowest_prices)
max_val = np.max(highest_prices)
print(min_val, max_val)
ptp = np.ptp(lowest_prices)
print(ptp, '-->ptp')
print(np.max(lowest_prices) - np.min(lowest_prices))
# 哪一天?
min_val_index = np.argmin(lowest_prices)
max_val_index = np.argmax(highest_prices)
print(dates[min_val_index])
print(dates[max_val_index])
a = np.arange(1,10).reshape(3,3)
b = np.arange(1,10)[::-1].reshape(3,3)
print(a)
print(b)
c = np.maximum(a,b) # 保留大的数
d = np.minimum(a,b) # 保留小的数
print(c, '--c')
print(d, '--d')
中位数
mean = (a[int(n/2)] + a[int((n-1)/2)] ) / 2
import numpy as np
closing_prices = np.loadtxt( '../../data/aapl.csv',
delimiter=',', usecols=(6), unpack=True)
size = closing_prices.size
sorted_prices = np.msort(closing_prices)
median = (sorted_prices[int((size - 1) / 2)] + sorted_prices[int(size / 2)]) / 2
print(median)
median = np.median(closing_prices)
print(median)
标准差
样本:S = [s1, s2, …, sn]
平均值:m = (s1+s2+…+sn)/n
离差:D = [d1, d2, …, dn], di = si-m
离差方:Q = [q1, q2, …, qn], qi = di2
总体方差:v = (q1+q2+…+qn)/n
总体标准差:s = sqrt(v),方均根
样本方差:v’ = (q1+q2+…+qn)/(n-1)
样本标准差:s’ = sqrt(v’),方均根
import numpy as np
import datetime as dt
import matplotlib.pyplot as mp
import matplotlib.dates as md
def dmy2ymd(dmy):
dmy = str(dmy,encoding='utf8')
time = dt.datetime.strptime(dmy, "%d-%m-%Y")
return time.strftime("%Y-%m-%d")
dates, opening_prices, highest_prices, lowest_prices, closing_prices\
= np.loadtxt('../da_data/aapl.csv', delimiter=',',
usecols=(1, 3, 4, 5, 6),unpack=True,
dtype='M8[D], f8, f8, f8, f8',
converters={1: dmy2ymd}
)
np_std = np.std(closing_prices, ddof=0) # ddof = 1 样本标准差
print(np_std)
mean = np.mean(closing_prices)
D = (closing_prices - mean) ** 2
v = np.mean(D)
s = np.sqrt(v)
print(s)
时间数据处理
import numpy as np
import datetime as dt
import matplotlib.pyplot as mp
import matplotlib.dates as md
def dmy2wday(dmy):
dmy = str(dmy,encoding='utf8')
time = dt.datetime.strptime(dmy, '%d-%m-%Y')
return time.weekday() # 周一 : 0
dates, closing_prices\
= np.loadtxt('../da_data/aapl.csv', delimiter=',',
usecols=(1, 6),unpack=True,
converters={1: dmy2wday}
)
ave_prices = np.zeros(5)
for i in range(ave_prices.size):
ave_prices[i] = np.mean(closing_prices[dates == i])
mp.plot(np.arange(0,ave_prices.size), ave_prices)
mp.scatter(np.argmax(ave_prices), np.max(ave_prices), s = 40, color='orangered')
mp.scatter(np.argmin(ave_prices), np.min(ave_prices), s = 40, color='orangered')
mp.show()
数组的轴向汇总
import numpy as np
ary = np.arange(1,37).reshape(6, 6)
def apply(data):
return data.mean()
r = np.apply_along_axis(apply, 1, ary)
r1 = np.apply_along_axis(apply, 0, ary)
print(r)
print(r1)
移动均线
import numpy as np
import datetime as dt
import matplotlib.pyplot as mp
import matplotlib.dates as md
def dmy2ymd(dmy):
dmy = str(dmy,encoding='utf8')
time = dt.datetime.strptime(dmy, "%d-%m-%Y")
return time.strftime("%Y-%m-%d")
dates, opening_prices, highest_prices, lowest_prices, closing_prices\
= np.loadtxt('../da_data/aapl.csv', delimiter=',',
usecols=(1, 3, 4, 5, 6),unpack=True,
dtype='M8[D], f8, f8, f8, f8',
converters={1: dmy2ymd}
)
n = 5
ma5 = np.zeros(closing_prices.size - n+1)
for i in range(ma5.size):
ma5[i] = closing_prices[i:i+n].mean()
n = 10
ma10 = np.zeros(closing_prices.size - n+1)
for i in range(ma10.size):
ma10[i] = closing_prices[i:i+n].mean()
mp.grid(linestyle=":")
ax = mp.gca()
ax.xaxis.set_major_locator(md.WeekdayLocator(byweekday=md.MO))
ax.xaxis.set_major_formatter(md.DateFormatter('%d %b %Y'))
ax.xaxis.set_minor_locator(md.DayLocator())
dates = dates.astype(md.datetime.datetime)
mp.plot(dates, closing_prices, color='dodgerblue', linestyle=":", label='colsing_prices', alpha=0.4)
mp.plot(dates[4:], ma5, color='limegreen', linestyle="-", label='ma5')
mp.plot(dates[9:], ma10, color='orangered', linestyle="-", label='ma10')
mp.gcf().autofmt_xdate()
mp.legend()
mp.show()
卷积
import numpy as np
import datetime as dt
import matplotlib.pyplot as mp
import matplotlib.dates as md
def dmy2ymd(dmy):
dmy = str(dmy,encoding='utf8')
time = dt.datetime.strptime(dmy, "%d-%m-%Y")
return time.strftime("%Y-%m-%d")
dates, opening_prices, highest_prices, lowest_prices, closing_prices\
= np.loadtxt('../da_data/aapl.csv', delimiter=',',
usecols=(1, 3, 4, 5, 6),unpack=True,
dtype='M8[D], f8, f8, f8, f8',
converters={1: dmy2ymd}
)
n = 5
ma5 = np.zeros(closing_prices.size - n+1)
for i in range(ma5.size):
ma5[i] = closing_prices[i:i+n].mean()
n = 10
ma10 = np.zeros(closing_prices.size - n+1)
for i in range(ma10.size):
ma10[i] = closing_prices[i:i+n].mean()
n = 5
cma5 = np.convolve(closing_prices, np.ones(n) / n,mode='valid')
n = 10
cma10 = np.convolve(closing_prices, np.ones(n) / n,mode='valid')
mp.grid(linestyle=":")
ax = mp.gca()
ax.xaxis.set_major_locator(md.WeekdayLocator(byweekday=md.MO))
ax.xaxis.set_major_formatter(md.DateFormatter('%d %b %Y'))
ax.xaxis.set_minor_locator(md.DayLocator())
dates = dates.astype(md.datetime.datetime)
mp.plot(dates, closing_prices, color='dodgerblue', linestyle=":", label='colsing_prices', alpha=0.4)
mp.plot(dates[4:], ma5, color='limegreen', linestyle="-", label='ma5')
mp.plot(dates[4:], cma5, color='red', linestyle=":", label='cma5', linewidth=3, alpha=0.4)
mp.plot(dates[9:], ma10, color='orangered', linestyle="-", label='ma10')
mp.plot(dates[9:], cma10, color='green', linestyle=":", label='cma10', linewidth=3, alpha=0.4)
mp.gcf().autofmt_xdate()
mp.legend()
mp.show()
加权卷积
卷积核不一样
布林带
中轨:移动平均线
上轨:中轨+2x5日收盘价标准差 (顶部的压力)
下轨:中轨-2x5日收盘价标准差 (底部的支撑力)
# 标准差
import numpy as np
import datetime as dt
import matplotlib.pyplot as mp
import matplotlib.dates as md
def dmy2ymd(dmy):
dmy = str(dmy,encoding='utf8')
time = dt.datetime.strptime(dmy, "%d-%m-%Y")
return time.strftime("%Y-%m-%d")
dates, opening_prices, highest_prices, lowest_prices, closing_prices\
= np.loadtxt('../da_data/aapl.csv', delimiter=',',
usecols=(1, 3, 4, 5, 6),unpack=True,
dtype='M8[D], f8, f8, f8, f8',
converters={1: dmy2ymd}
)
n = 5
ma5 = np.convolve(closing_prices, np.ones(shape=(5,), dtype='f8') / n, mode='valid')
# 5日标准关差
std5 = np.zeros_like(ma5)
for i in range(std5.size):
std5[i] = np.std(closing_prices[i:i+n])
u_ma5 = ma5 + std5*2
d_ma5 = ma5 - std5*2
mp.grid(linestyle=":")
ax = mp.gca()
ax.xaxis.set_major_locator(md.WeekdayLocator(byweekday=md.MO))
ax.xaxis.set_major_formatter(md.DateFormatter('%d %b %Y'))
ax.xaxis.set_minor_locator(md.DayLocator())
dates = dates.astype(md.datetime.datetime)
mp.plot(dates, closing_prices, color='dodgerblue', linestyle=":", label='closing_prices')
mp.plot(dates[4:], ma5, color='orangered', linestyle='-', label='ma5')
mp.plot(dates[4:], u_ma5, color='limegreen', linestyle='-', label='upper')
mp.plot(dates[4:], d_ma5, color='blue', linestyle='-', label=''lower)
mp.gcf().autofmt_xdate()
mp.legend()
mp.show()
线性模型
x = np.linalg.lstsq(A,B)
import numpy as np
import datetime as dt
import matplotlib.pyplot as mp
import matplotlib.dates as md
def dmy2ymd(dmy):
dmy = str(dmy,encoding='utf8')
time = dt.datetime.strptime(dmy, "%d-%m-%Y")
return time.strftime("%Y-%m-%d")
dates, opening_prices, highest_prices, lowest_prices, closing_prices\
= np.loadtxt('../da_data/aapl.csv', delimiter=',',
usecols=(1, 3, 4, 5, 6),unpack=True,
dtype='M8[D], f8, f8, f8, f8',
converters={1: dmy2ymd}
)
A = np.zeros(shape=(3,3))
pred_prices = np.zeros(30-6)
for i in range(pred_prices.size):
for j in range(3):
A[j,] = closing_prices[j+i:i+j+3]
B = closing_prices[i+3:i+6]
x = np.linalg.lstsq(A, B)[0]
pred_prices[i] = B.dot(x)
print(pred_prices)
mp.grid(linestyle=":")
ax = mp.gca()
ax.xaxis.set_major_locator(md.WeekdayLocator(byweekday=md.MO))
ax.xaxis.set_major_formatter(md.DateFormatter('%d %b %Y'))
ax.xaxis.set_minor_locator(md.DayLocator())
dates = dates.astype(md.datetime.datetime)
mp.plot(dates, closing_prices, color='dodgerblue', linestyle=":", label='closing_prices')
mp.plot(dates[6:], pred_prices, "o-",color='limegreen')
mp.gcf().autofmt_xdate()
mp.legend()
mp.show()
线性拟合
import numpy as np
import datetime as dt
import matplotlib.pyplot as mp
import matplotlib.dates as md
def dmy2ymd(dmy):
dmy = str(dmy,encoding='utf8')
time = dt.datetime.strptime(dmy, "%d-%m-%Y")
return time.strftime("%Y-%m-%d")
dates, opening_prices, highest_prices, lowest_prices, closing_prices\
= np.loadtxt('../da_data/aapl.csv', delimiter=',',
usecols=(1, 3, 4, 5, 6),unpack=True,
dtype='M8[D], f8, f8, f8, f8',
converters={1: dmy2ymd}
)
mp.grid(linestyle=":")
trend_prices = (highest_prices + lowest_prices + closing_prices) / 3
# 日期转 数据
days = dates.astype('M8[D]').astype('int32')
A = np.column_stack((days,np.ones_like(days)))
B = closing_prices
x = np.linalg.lstsq(A, B)[0]
y = x[0] * days + x[1]
ax = mp.gca()
ax.xaxis.set_major_locator(md.WeekdayLocator(byweekday=md.MO))
ax.xaxis.set_major_formatter(md.DateFormatter('%d %b %Y'))
ax.xaxis.set_minor_locator(md.DayLocator())
dates = dates.astype(md.datetime.datetime)
mp.plot(dates, trend_prices, color='dodgerblue', linestyle=":")
mp.scatter(dates, trend_prices, marker='o', color='orangered', s=30)
mp.plot(dates, y, '-', color='limegreen')
mp.gcf().autofmt_xdate()
mp.show()
协方差、相关矩阵、相关系数
通过两组统计数据计算而得的协方差可以评估这两组统计数据的相似程度。
样本:
A = [a1, a2, ..., an]
B = [b1, b2, ..., bn]
平均值:
ave_a = (a1 + a2 +...+ an)/n
ave_b = (b1 + b2 +...+ bn)/n
离差(用样本中的每一个元素减去平均数,求得数据的误差程度):
dev_a = [a1, a2, ..., an] - ave_a
dev_b = [b1, b2, ..., bn] - ave_b
协方差
协方差可以简单反映两组统计样本的相关性,值为正,则为正相关;值为负,则为负相关,绝对值越大相关性越强。
cov_ab = ave(dev_a x dev_b)
cov_ba = ave(dev_b x dev_a)
import numpy as np
import matplotlib.pyplot as mp
a = np.random.randint(1, 30, 10)
b = np.random.randint(1, 30, 10)
#平均值
ave_a = np.mean(a)
ave_b = np.mean(b)
#离差
dev_a = a - ave_a
dev_b = b - ave_b
#协方差
cov_ab = np.mean(dev_a*dev_b)
cov_ba = np.mean(dev_b*dev_a)
print('a与b数组:', a, b)
print('a与b样本方差:', np.sum(dev_a**2)/(len(dev_a)-1), np.sum(dev_b**2)/(len(dev_b)-1))
print('a与b协方差:',cov_ab, cov_ba)
#绘图,查看两条图线的相关性
mp.figure('COV LINES', facecolor='lightgray')
mp.title('COV LINES', fontsize=16)
mp.xlabel('x', fontsize=14)
mp.ylabel('y', fontsize=14)
x = np.arange(0, 10)
#a,b两条线
mp.plot(x, a, color='dodgerblue', label='Line1')
mp.plot(x, b, color='limegreen', label='Line2')
#a,b两条线的平均线
mp.plot([0, 9], [ave_a, ave_a], color='dodgerblue', linestyle='--', alpha=0.7, linewidth=3)
mp.plot([0, 9], [ave_b, ave_b], color='limegreen', linestyle='--', alpha=0.7, linewidth=3)
mp.grid(linestyle='--', alpha=0.5)
mp.legend()
mp.tight_layout()
mp.show()
相关系数
协方差除去两组统计样本标准差的乘积是一个[-1, 1]之间的数。该结果称为统计样本的相关系数。
# a组样本 与 b组样本做对照后的相关系数
cov_ab/(std_a x std_b)
# b组样本 与 a组样本做对照后的相关系数
cov_ba/(std_b x std_a)
# a样本与a样本作对照 b样本与b样本做对照 二者必然相等
cov_ab/(std_a x std_b)=cov_ba/(std_b x std_a)
通过相关系数可以分析两组数据的相关性:
若相关系数越接近于0,越表示两组样本越不相关。
若相关系数越接近于1,越表示两组样本正相关。
若相关系数越接近于-1,越表示两组样本负相关。
案例:输出案例中两组数据的相关系数。
print('相关系数:', cov_ab/(np.std(a)*np.std(b)), cov_ba/(np.std(a)*np.std(b)))
相关矩阵
矩阵正对角线上的值都为1。(同组样本自己相比绝对正相关)
numpy提供了求得相关矩阵的API:
# 相关矩阵
numpy.corrcoef(a, b)
# 相关矩阵的分子矩阵 (协方差矩阵)
# [[a方差,ab协方差], [ba协方差, b方差]]
numpy.cov(a, b)
使用场景
相关性推荐(相似用户?)
多项式拟合
多项式的一般形式:
多项式拟合的目的是为了找到一组p0-pn,使得拟合方程尽可能的与实际样本数据相符合。
假设拟合得到的多项式如下:
则拟合函数与真实结果的差方如下
那么多项式拟合的过程即为求取一组p0-pn,使得loss的值最小。
X = [x1, x2, ..., xn] - 自变量
Y = [y1, y2, ..., yn] - 实际函数值
Y'= [y1',y2',...,yn'] - 拟合函数值
P = [p0, p1, ..., pn] - 多项式函数中的系数
根据一组样本,并给出最高次幂,求出拟合系数
np.polyfit(X, Y, 最高次幂)->P
多项式运算相关API:
根据拟合系数与自变量求出拟合值, 由此可得拟合曲线坐标样本数据 [X, Y']
np.polyval(P, X)->Y'
多项式函数求导,根据拟合系数求出多项式函数导函数的系数
np.polyder(P)->Q
已知多项式系数Q 求多项式函数的根(与x轴交点的横坐标)
xs = np.roots(Q)
两个多项式函数的差函数的系数(可以通过差函数的根求取两个曲线的交点)
Q = np.polysub(P1, P2)
案例:求多项式 y = 4x3 + 3x2 - 1000x + 1曲线驻点的坐标。
import numpy as np
import matplotlib.pyplot as mp
x = np.linspace(-100, 100, 1000)
p = np.array([4, 3, -1000, 1])
y = np.polyval(p,x)
q = np.polyder(p)
roots = np.roots(q)
ys = np.polyval(p, roots)
print(ys)
mp.hlines(ys, -100, 100)
mp.plot(x, y)
mp.show()
数据平滑
数据的平滑处理通常包含有降噪、拟合等操作。降噪的功能意在去除额外的影响因素,拟合的目的意在数学模型化,可以通过更多的数学方法识别曲线特征。
案例:绘制两只股票收益率曲线。收益率 =(后一天收盘价-前一天收盘价) / 前一天收盘价
- 使用卷积完成数据降噪。
import numpy as np
import matplotlib.pyplot as mp
import datetime as dt
import matplotlib.dates as md
def dmy2ymd(dmy):
dmy = str(dmy,encoding='utf8')
time = dt.datetime.strptime(dmy, "%d-%m-%Y")
return time.strftime("%Y-%m-%d")
dates, bhp_closing_prices = np.loadtxt( r"../da_data/aapl.csv", delimiter=',', usecols=(1,6), dtype='M8[D], f8',converters={1:dmy2ymd}, unpack=True)
vale_closing_prices = np.loadtxt( '../da_data/bhp.csv', delimiter=',', usecols=(6), dtype='f8',converters={1:dmy2ymd}, unpack=True)
bhp_returns = np.diff(bhp_closing_prices) / bhp_closing_prices[:-1]
vale_returns = np.diff(vale_closing_prices) / vale_closing_prices[:-1]
dates = dates[:-1]
#卷积降噪
convolve_core = np.hanning(8)
convolve_core /= convolve_core.sum()
bhp_returns_convolved = np.convolve(bhp_returns, convolve_core, 'valid')
vale_returns_convolved = np.convolve(vale_returns, convolve_core, 'valid')
#绘制这条曲线
mp.figure('BHP VALE RETURNS', facecolor='lightgray')
mp.title('BHP VALE RETURNS', fontsize=20)
mp.xlabel('Date')
mp.ylabel('Price')
ax = mp.gca()
ax.xaxis.set_major_locator(md.WeekdayLocator(byweekday=md.MO))
ax.xaxis.set_minor_locator(md.DayLocator())
ax.xaxis.set_major_formatter(md.DateFormatter('%Y %m %d'))
dates = dates.astype('M8[D]')
#绘制收益线
mp.plot(dates, bhp_returns, color='dodgerblue', linestyle='--', label='bhp_returns', alpha=0.3)
mp.plot(dates, vale_returns, color='orangered', linestyle='--', label='vale_returns', alpha=0.3)
#绘制卷积降噪线
mp.plot(dates[7:], bhp_returns_convolved, color='dodgerblue', label='bhp_returns_convolved', alpha=0.5)
mp.plot(dates[7:], vale_returns_convolved, color='orangered', label='vale_returns_convolved', alpha=0.5)
mp.show()
- 对处理过的股票收益率做多项式拟合。
#拟合这两条曲线,获取两组多项式系数
dates = dates.astype(int)
bhp_p = np.polyfit(dates[7:], bhp_returns_convolved, 3)
bhp_polyfit_y = np.polyval(bhp_p, dates[7:])
vale_p = np.polyfit(dates[7:], vale_returns_convolved, 3)
vale_polyfit_y = np.polyval(vale_p, dates[7:])
#绘制拟合线
mp.plot(dates[7:], bhp_polyfit_y, color='dodgerblue', label='bhp_returns_polyfit')
mp.plot(dates[7:], vale_polyfit_y, color='orangered', label='vale_returns_polyfit')
- 通过获取两个函数的焦点可以分析两只股票的投资收益比。
#求两条曲线的交点 f(bhp) = f(vale)的根
sub_p = np.polysub(bhp_p, vale_p)
roots_x = np.roots(sub_p) # 让f(bhp) - f(vale) = 0 函数的两个根既是两个函数的焦点
roots_x = roots_x.compress( (dates[0] <= roots_x) & (roots_x <= dates[-1]))
roots_y = np.polyval(bhp_p, roots_x)
#绘制这些点
mp.scatter(roots_x, roots_y, marker='D', color='green', s=60, zorder=3)
符号数组
sign函数可以把样本数组的变成对应的符号数组,正数变为1,负数变为-1,0则变为0。
ary = np.sign(源数组)
净额成交量(OBV)
成交量可以反映市场对某支股票的人气,而成交量是一只股票上涨的能量。一支股票的上涨往往需要较大的成交量。而下跌时则不然。
若相比上一天的收盘价上涨,则为正成交量;若相比上一天的收盘价下跌,则为负成交量。
绘制OBV柱状图
dates, closing_prices, volumes = np.loadtxt(
'../../data/bhp.csv', delimiter=',',
usecols=(1, 6, 7), unpack=True,
dtype='M8[D], f8, f8', converters={1: dmy2ymd})
diff_closing_prices = np.diff(closing_prices)
sign_closing_prices = np.sign(diff_closing_prices)
obvs = volumes[1:] * sign_closing_prices
mp.figure('On-Balance Volume', facecolor='lightgray')
mp.title('On-Balance Volume', fontsize=20)
mp.xlabel('Date', fontsize=14)
mp.ylabel('OBV', fontsize=14)
ax = mp.gca()
ax.xaxis.set_major_locator(md.WeekdayLocator(byweekday=md.MO))
ax.xaxis.set_minor_locator(md.DayLocator())
ax.xaxis.set_major_formatter(md.DateFormatter('%d %b %Y'))
mp.tick_params(labelsize=10)
mp.grid(axis='y', linestyle=':')
dates = dates[1:].astype(md.datetime.datetime)
mp.bar(dates, obvs, 1.0, color='dodgerblue',
edgecolor='white', label='OBV')
mp.legend()
mp.gcf().autofmt_xdate()
mp.show()
数组处理函数
ary = np.piecewise(源数组, 条件序列, 取值序列)
针对源数组中的每一个元素,检测其是否符合条件序列中的每一个条件,符合哪个条件就用取值系列中与之对应的值,表示该元素,放到目标 数组中返回。
条件序列: [a < 0, a == 0, a > 0]
取值序列: [-1, 0, 1]
a = np.array([70, 80, 60, 30, 40])
d = np.piecewise(
a,
[a < 60, a == 60, a > 60],
[-1, 0, 1])
# d = [ 1 1 0 -1 -1]
失量化
矢量化指的是用数组代替标量来操作数组里的每个元素。
numpy提供了vectorize函数,可以把处理标量的函数矢量化,返回的函数可以直接处理ndarray数组。
import math as m
import numpy as np
def foo(x, y):
return m.sqrt(x**2 + y**2)
x, y = 1, 4
print(foo(x, y))
X, Y = np.array([1, 2, 3]), np.array([4, 5, 6])
vectorized_foo = np.vectorize(foo)
print(vectorized_foo(X, Y))
print(np.vectorize(foo)(X, Y))
numpy还提供了frompyfunc函数,也可以完成与vectorize相同的功能:
# 把foo转换成矢量函数,该矢量函数接收2个参数,返回一个结果
fun = np.frompyfunc(foo, 2, 1)
fun(X, Y)
案例:定义一种买进卖出策略,通过历史数据判断这种策略是否值得实施。
dates, opening_prices, highest_prices, \
lowest_prices, closing_prices = np.loadtxt(
'../../data/bhp.csv', delimiter=',',
usecols=(1, 3, 4, 5, 6), unpack=True,
dtype='M8[D], f8, f8, f8, f8',
converters={1: dmy2ymd})
# 定义一种投资策略
def profit(opening_price, highest_price,
lowest_price, closing_price):
buying_price = opening_price * 0.99
if lowest_price <= buying_price <= highest_price:
return (closing_price - buying_price) * \
100 / buying_price
return np.nan # 无效值
# 矢量化投资函数
profits = np.vectorize(profit)(opening_prices,
highest_prices, lowest_prices, closing_prices)
nan = np.isnan(profits)
dates, profits = dates[~nan], profits[~nan]
gain_dates, gain_profits = dates[profits > 0], profits[profits > 0]
loss_dates, loss_profits = dates[profits < 0], profits[profits < 0]
mp.figure('Trading Simulation', facecolor='lightgray')
mp.title('Trading Simulation', fontsize=20)
mp.xlabel('Date', fontsize=14)
mp.ylabel('Profit', fontsize=14)
ax = mp.gca()
ax.xaxis.set_major_locator(md.WeekdayLocator(byweekday=md.MO))
ax.xaxis.set_minor_locator(md.DayLocator())
ax.xaxis.set_major_formatter(md.DateFormatter('%d %b %Y'))
mp.tick_params(labelsize=10)
mp.grid(linestyle=':')
if dates.size > 0:
dates = dates.astype(md.datetime.datetime)
mp.plot(dates, profits, c='gray',
label='Profit')
mp.axhline(y=profits.mean(), linestyle='--',
color='gray')
if gain_dates.size > 0:
gain_dates = gain_dates.astype(md.datetime.datetime)
mp.plot(gain_dates, gain_profits, 'o',
c='orangered', label='Gain Profit')
mp.axhline(y=gain_profits.mean(), linestyle='--',
color='orangered')
if loss_dates.size > 0:
loss_dates = loss_dates.astype(md.datetime.datetime)
mp.plot(loss_dates, loss_profits, 'o',
c='limegreen', label='Loss Profit')
mp.axhline(y=loss_profits.mean(), linestyle='--',
color='limegreen')
mp.legend()
mp.gcf().autofmt_xdate()
mp.show()
矩阵
矩阵是numpy.matrix类类型的对象,该类继承自numpy.ndarray,任何针对多维数组的操作,对矩阵同样有效,但是作为子类矩阵又结合其自身的特点,做了必要的扩充,比如:乘法计算、求逆等。
矩阵对象的创建
# 如果copy的值为True(缺省),所得到的矩阵对象与参数中的源容器各自拥有独立的数据拷贝,数据不共享。
numpy.matrix(
ary, # 任何可被解释为矩阵的二维容器
copy=True # 是否复制数据(缺省值为True,即复制数据)
)
# 等价于:numpy.matrix(..., copy=False)
# 由该函数创建的矩阵对象与参数中的源容器一定共享数据,无法拥有独立的数据拷贝
numpy.mat(任何可被解释为矩阵的二维容器)
# 该函数可以接受字符串形式的矩阵描述:
# 数据项通过空格分隔,数据行通过分号分隔。例如:'1 2 3; 4 5 6'
numpy.mat(拼块规则)
矩阵的乘法运算
# 矩阵的乘法:乘积矩阵的第i行第j列的元素等于
# 被乘数矩阵的第i行与乘数矩阵的第j列的点积
#
# 1 2 6
# X----> 3 5 7
# | 4 8 9
# |
# 1 2 6 31 60 74
# 3 5 7 46 87 116
# 4 8 9 64 120 161
e = np.mat('1 2 6; 3 5 7; 4 8 9')
print(e * e)
清晰的认识到:
A与B能不能相乘
乘积结果矩阵的维度是多少
矩阵的逆矩阵
若两个矩阵A、B满足:AB = BA = E (E为单位矩阵),则成为A、B为逆矩阵。
e = np.mat('1 2 6; 3 5 7; 4 8 9')
print(e.I)
print(e * e.I)
ndarray提供了方法让多维数组替代矩阵的运算:
a = np.array([
[1, 2, 6],
[3, 5, 7],
[4, 8, 9]])
# 点乘法求ndarray的点乘结果,与矩阵的乘法运算结果相同
k = a.dot(a)
print(k)
# linalg模块中的inv方法可以求取a的逆矩阵
l = np.linalg.inv(a)
print(l)
案例:假设一帮孩子和家长出去旅游,去程坐的是bus,小孩票价为3元,家长票价为3.2元,共花了118.4;回程坐的是Train,小孩票价为3.5元,家长票价为3.6元,共花了135.2。分别求小孩和家长的人数。使用矩阵求解。
$$
\left[ \begin{array}{ccc}
3 & 3.2 \
3.5 & 3.6 \
\end{array} \right]
\times
\left[ \begin{array}{ccc}
x \
y \
\end{array} \right]
\left[ \begin{array}{ccc}
118.4 \
135.2 \
\end{array} \right]
$$
import numpy as np
prices = np.mat('3 3.2; 3.5 3.6')
totals = np.mat('118.4; 135.2')
persons = prices.I * totals
print(persons)
案例:斐波那契数列
1 1 2 3 5 8 13 21 34 …
X 1 1 1 1 1 1
1 0 1 0 1 0
--------------------------------
1 1 2 1 3 2 5 3
1 0 1 1 2 1 3 2
F^1 F^2 F^3 F^4 ... f^n
代码
import numpy as np
n = 35
# 使用递归实现斐波那契数列
def fibo(n):
return 1 if n < 3 else fibo(n - 1) + fibo(n - 2)
print(fibo(n))
# 使用矩阵实现斐波那契数列
print(int((np.mat('1. 1.; 1. 0.') ** (n - 1))[0, 0]))
通用函数
数组的裁剪
# 将调用数组中小于和大于下限和上限的元素替换为下限和上限,返回裁剪后的数组,调
# 用数组保持不变。
ndarray.clip(min=下限, max=上限)
any /all
a = np.arange(9)
print(a[np.any([a < 7, a % 2 == 0], axis=0)])
数组的压缩
# 返回由调用数组中满足条件的元素组成的新数组。
ndarray.compress(条件)
加法乘法通用函数
add(a, a) # 两数组相加
add.reduce(a) # a数组元素累加和
add.accumulate(a) # 累加和过程
add.outer([10, 20, 30], a) # 外和
# 返回调用数组中所有元素的乘积——累乘。
ndarray.prod()
# 返回调用数组中所有元素执行累乘的过程数组。
ndarray.cumprod()
np.outer([10, 20, 30], a) # 外积
案例:
a = np.arange(1, 7)
print(a)
b = a + a
print(b)
b = np.add(a, a)
print(b)
c = np.add.reduce(a)
print(c)
d = np.add.accumulate(a)
print(d)
# + 1 2 3 4 5 6
# --------------------
# 10 |11 12 13 14 15 16 |
# 20 |21 22 23 24 25 26 |
# 30 |31 32 33 34 35 36 |
--------------------
f = np.add.outer([10, 20, 30], a)
print(f)
# x 1 2 3 4 5 6
# -----------------------
# 10 |10 20 30 40 50 60 |
# 20 |20 40 60 80 100 120 |
# 30 |30 60 90 120 150 180 |
-----------------------
g = np.outer([10, 20, 30], a)
print(g)
除法通用函数
np.divide(a, b) # a 真除 b
np.floor(a / b) # a 地板除 b (真除的结果向下取整)
np.ceil(a / b) # a 天花板除 b (真除的结果向上取整)
np.trunc(a / b) # a 截断除 b (真除的结果直接干掉小数部分)
np.round(a / b)
案例:
import numpy as np
a = np.array([20, 20, -20, -20])
b = np.array([3, -3, 6, -6])
# 真除
c = np.true_divide(a, b)
c = np.divide(a, b)
c = a / b
print('array:',c)
# 对ndarray做floor操作
d = np.floor(a / b)
print('floor_divide:',d)
# 对ndarray做ceil操作
e = np.ceil(a / b)
print('ceil ndarray:',e)
# 对ndarray做trunc操作
f = np.trunc(a / b)
print('trunc ndarray:',f)
# 对ndarray做around操作
g = np.around(a / b)
print('around ndarray:',g)
位运算通用函数
位异或:
c = a ^ b
c = a.__xor__(b)
c = np.bitwise_xor(a, b)
按位异或操作可以很方便的判断两个数据是否同号。
-8 1000
-7 1001
-6 1010
-5 1011
-4 1100
-3 1101
-2 1110
-1 1111
0 0000
1 0001
2 0010
3 0011
4 0100
5 0101
6 0110
7 0111
0 ^ 0 = 0
0 ^ 1 = 1
1 ^ 0 = 1
1 ^ 1 = 0
a = np.array([0, -1, 2, -3, 4, -5])
b = np.array([0, 1, 2, 3, 4, 5])
print(a, b)
c = a ^ b
# c = a.__xor__(b)
# c = np.bitwise_xor(a, b)
print(np.where(c < 0)[0])
位与:
e = a & b
e = a.__and__(b)
e = np.bitwise_and(a, b)
0 & 0 = 0
0 & 1 = 0
1 & 0 = 0
1 & 1 = 1
利用位与运算计算某个数字是否是2的幂
# 1 2^0 00001 0 00000
# 2 2^1 00010 1 00001
# 4 2^2 00100 3 00011
# 8 2^3 01000 7 00111
# 16 2^4 10000 15 01111
# ...
d = np.arange(1, 21)
print(d)
e = d & (d - 1)
e = d.__and__(d - 1)
e = np.bitwise_and(d, d - 1)
print(e)
位或:
|
__or__
bitwise_or
0 | 0 = 0
0 | 1 = 1
1 | 0 = 1
1 | 1 = 1
位反:
~
__not__
bitwise_not
~0 = 1
~1 = 0
移位:
<< __lshift__ left_shift
>> __rshift__ right_shift
左移1位相当于乘2,右移1位相当于除2。
d = np.arange(1, 21)
print(d)
# f = d << 1
# f = d.__lshift__(1)
f = np.left_shift(d, 1)
print(f)
三角函数通用函数
numpy.sin()
傅里叶定理:
法国科学家傅里叶提出,任何一条周期曲线,无论多么跳跃或不规则,都能表示成一组光滑正弦曲线叠加之和。
合成方波
一个方波由如下参数的正弦波叠加而成:
曲线叠加的越多,越接近方波。所以可以设计一个函数,接收曲线的数量n作为参数,返回一个矢量函数,该函数可以接收x坐标数组,返回n个正弦波叠加得到的y坐标数组。
x = np.linspace(-2*np.pi, 2*np.pi, 1000)
y = np.zeros(1000)
n = 1000
for i in range(1, n+1):
y += 4 / ((2 * i - 1) * np.pi) * np.sin((2 * i - 1) * x)
mp.plot(x, y, label='n=1000')
mp.legend()
mp.show()
特征值和特征向量
对于n阶方阵A,如果存在数a和非零n维列向量x,使得Ax=ax,则称a是矩阵A的一个特征值,x是矩阵A属于特征值a的特征向量
#已知n阶方阵A, 求特征值与特征数组
# eigvals: 特征值数组
# eigvecs: 特征向量数组
eigvals, eigvecs = np.linalg.eig(A)
#已知特征值与特征向量,求方阵
S = eigvecs * np.diag(eigvals) * eigvecs逆
案例:
import numpy as np
A = np.mat('3 -2; 1 0')
print(A)
eigvals, eigvecs = np.linalg.eig(A)
print(eigvals)
print(eigvecs)
print(A * eigvecs[:, 0]) # 方阵*特征向量
print(eigvals[0] * eigvecs[:, 0]) #特征值*特征向量
S = np.mat(eigvecs) * np.mat(np.diag(eigvals)) * np.mat(eigvecs.I)
案例:读取图片的亮度矩阵,提取特征值与特征向量,保留部分特征值,重新生成新的亮度矩阵,绘制图片。
import numpy as np
import scipy.misc as sm
import matplotlib.pyplot as mp
# 灰度图
origin = sm.imread('../da_data/lily.jpg', True)
print(origin.shape)
# 提取特征值
eigvals, eigvecs = np.linalg.eig(np.mat(origin))
# 抹掉一部分, 生成新图
eigvals[50:] = 0
dst = eigvecs * np.diag(eigvals) * eigvecs.I
mp.subplot(121)
mp.xticks([])
mp.yticks([])
mp.imshow(origin, cmap='gray')
mp.subplot(122)
mp.xticks([])
mp.yticks([])
print(dst.shape)
mp.imshow(dst.real, cmap='gray')
mp.show()
奇异值分解(可用于非方阵)
有一个矩阵M,可以分解为3个矩阵U、S、V,使得U x S x V等于M。U与V都是正交矩阵(乘以自身的转置矩阵结果为单位矩阵)。那么S矩阵主对角线上的元素称为矩阵M的奇异值,其它元素均为0。
import numpy as np
M = np.mat('4 11 14; 8 7 -2')
U,Sv,V = np.linalg.svd(M, full_matrices=False) # 返回的 V不需要完整的方阵
print(Sv)
S = np.diag(Sv)
print(U * S * V)
案例:读取图片的亮度矩阵,提取奇异值与两个正交矩阵,保留部分奇异值,重新生成新的亮度矩阵,绘制图片。
import numpy as np
import matplotlib.pyplot as mp
import scipy.misc as sm
original = sm.imread('../da_data/lily.jpg', True)
#提取奇异值 sv
U, sv, V = np.linalg.svd(original)
print(U.shape, sv.shape, V.shape)
sv[50:] = 0
original2 = np.mat(U) * np.mat(np.diag(sv)) * np.mat(V)
mp.figure("Lily Features")
mp.subplot(221)
mp.xticks([])
mp.yticks([])
mp.imshow(original, cmap='gray')
mp.subplot(222)
mp.xticks([])
mp.yticks([])
mp.imshow(original2, cmap='gray')
mp.tight_layout()
mp.show()
快速傅里叶变换模块(fft)
什么是傅里叶定理?
法国科学家傅里叶提出,任何一条周期曲线,无论多么跳跃或不规则,都能表示成一组光滑正弦曲线叠加之和。
什么是傅里叶变换?
即是基于傅里叶定理对一条周期曲线进行拆解的过程,最终得到一组光滑的正弦曲线。
傅里叶变换的目的是可将时域(即时间域)上的信号转变为频域(即频率域)上的信号,随着域的不同,对同一个事物的了解角度也就随之改变,因此在时域中某些不好处理的地方,在频域就可以较为简单的处理。这就可以大量减少处理信号存储量。
例如:弹钢琴
假设有一时间域函数:y = f(x),根据傅里叶的理论它可以被分解为一系列正弦函数的叠加,他们的振幅A,频率ω或初相位φ不同:
所以傅里叶变换可以把一个比较复杂的函数转换为多个简单函数的叠加,看问题的角度也从时间域转到了频率域,有些的问题处理起来就会比较简单。
傅里叶变换相关函数
导入快速傅里叶变换所需模块
import numpy.fft as nf
通过采样数与采样周期求得傅里叶变换分解所得曲线的频率序列
freqs = nf.fftfreq(采样数量, 采样周期)
通过原函数值的序列j经过快速傅里叶变换得到一个复数数组,复数的模代表的是振幅,复数的辐角代表初相位
nf.fft(原函数值序列) -> 目标函数值序列(复数)
通过一个复数数组(复数的模代表的是振幅,复数的辐角代表初相位)经过逆向傅里叶变换得到合成的函数值数组
nf.ifft(目标函数值序列(复数))->原函数值序列
案例:针对合成波做快速傅里叶变换,得到一组复数序列;再针对该复数序列做逆向傅里叶变换得到新的合成波并绘制。
ffts = nf.fft(sigs6)
sigs7 = nf.ifft(ffts).real
mp.plot(times, sigs7, label=r'$\omega$='+str(round(1 / (2 * np.pi),3)), alpha=0.5, linewidth=6)
案例:针对合成波做快速傅里叶变换,得到分解波数组的频率、振幅、初相位数组,并绘制频域图像。
import numpy as np
import numpy.fft as nf
import matplotlib.pyplot as mp
x = np.linspace(-2*np.pi, 2*np.pi, 1000)
y = np.zeros(1000)
n = 1000
for i in range(1, n+1):
y += 4 / ((2 * i - 1) * np.pi) * np.sin((2 * i - 1) * x)
# 傅里叶变换
complex_ary = nf.fft(y)
print(complex_ary.shape, complex_ary.dtype)
y_ = nf.ifft(complex_ary) # 合成波
freqs = nf.fftfreq(y_.size, x[1] - x[0]) # 频率
pows = np.abs(complex_ary) # 能量
mp.subplot(121)
mp.plot(x, y, label='n=1000')
mp.plot(x, y_.real, label='y_=1000', linewidth=7, alpha=0.2)
mp.legend()
mp.subplot(122)
mp.plot(freqs[freqs > 0], pows[freqs > 0], label='freqs')
mp.tight_layout()
mp.legend()
mp.show()
基于傅里叶变换的频域滤波
含噪信号是高能信号与低能噪声叠加的信号,可以通过傅里叶变换的频域滤波实现降噪。
通过FFT使含噪信号转换为含噪频谱,去除低能噪声,留下高能频谱后再通过IFFT留下高能信号。
案例:基于傅里叶变换的频域滤波为音频文件去除噪声。
- 读取音频文件,获取音频文件基本信息:采样个数,采样周期,与每个采样的声音信号值。绘制音频时域的:时间/位移图像。
import numpy as np
import numpy.fft as nf
import scipy.io.wavfile as wf
import matplotlib.pyplot as mp
sample_rate, noised_sigs = wf.read('../data/noised.wav')
noised_sigs = noised_sigs / 2 ** 15
times = np.arange(len(noised_sigs)) / sample_rate
mp.figure('Filter', facecolor='lightgray')
mp.subplot(221)
mp.title('Time Domain', fontsize=16)
mp.ylabel('Signal', fontsize=12)
mp.tick_params(labelsize=10)
mp.grid(linestyle=':')
mp.plot(times[:178], noised_sigs[:178],c='orangered', label='Noised')
mp.legend()
mp.show()
- 基于傅里叶变换,获取音频频域信息,绘制音频频域的:频率/能量图像。
freqs = nf.fftfreq(times.size, 1 / sample_rate)
noised_ffts = nf.fft(noised_sigs)
noised_pows = np.abs(noised_ffts)
mp.subplot(222)
mp.title('Frequency Domain', fontsize=16)
mp.ylabel('Power', fontsize=12)
mp.tick_params(labelsize=10)
mp.grid(linestyle=':')
mp.semilogy(freqs[freqs >= 0],noised_pows[freqs >= 0], c='limegreen',label='Noised')
mp.legend()
- 将低能噪声去除后绘制音频频域的:频率/能量图像。
fund_freq = freqs[noised_pows.argmax()]
noised_indices = np.where(freqs != fund_freq)
filter_ffts = noised_ffts.copy()
filter_ffts[noised_indices] = 0
filter_pows = np.abs(filter_ffts)
mp.subplot(224)
mp.xlabel('Frequency', fontsize=12)
mp.ylabel('Power', fontsize=12)
mp.tick_params(labelsize=10)
mp.grid(linestyle=':')
mp.plot(freqs[freqs >= 0], filter_pows[freqs >= 0],c='dodgerblue', label='Filter')
mp.legend()
- 基于逆向傅里叶变换,生成新的音频信号,绘制音频时域的:时间/位移图像。
filter_sigs = nf.ifft(filter_ffts).real
mp.subplot(223)
mp.xlabel('Time', fontsize=12)
mp.ylabel('Signal', fontsize=12)
mp.tick_params(labelsize=10)
mp.grid(linestyle=':')
mp.plot(times[:178], filter_sigs[:178],c='hotpink', label='Filter')
mp.legend()
- 重新生成音频文件。
wf.write('../../data/filter.wav',sample_rate,(filter_sigs * 2 ** 15).astype(np.int16))
随机数模块(random)
生成服从特定统计规律的随机数序列。
二项分布(binomial)
二项分布就是重复n次独立事件的伯努利试验。在每次试验中只有两种可能的结果,而且两种结果发生与否互相对立,并且相互独立,事件发生与否的概率在每一次独立试验中都保持不变。
# 产生size个随机数,每个随机数来自n次尝试中的成功次数,其中每次尝试成功的概率为p。
np.random.binomial(n, p, size)
二项分布可以用于求如下场景的概率的近似值:
- 某人投篮命中率为0.3,投10次,进5个球的概率。
sum(np.random.binomial(10, 0.3, 200000) == 5) / 200000
- 某人打客服电话,客服接通率是0.6,一共打了3次,都没人接的概率。
sum(np.random.binomial(3, 0.6, 200000) == 0) / 200000
超几何分布(hypergeometric)
# 产生size个随机数,每个随机数t为在总样本中随机抽取nsample个样本后好样本的个数,总样本由ngood个好样本和nbad个坏样本组成
np.random.hypergeometric(ngood, nbad, nsample, size)
正态分布(normal)
# 产生size个随机数,服从标准正态(期望=0, 标准差=1)分布。
np.random.normal(size)
# 产生size个随机数,服从正态分布(期望=1, 标准差=10)。
np.random.normal(loc=1, scale=10, size)
杂项功能
排序
np.msort(closing_prices)
联合间接排序
联合间接排序支持为待排序列排序,若待排序列值相同,则利用参考序列作为参考继续排序。最终返回排序过后的有序索引序列。
indices = numpy.lexsort((次排序序列, 主排序序列))
案例:先按价格排序,再按销售量倒序排列。
import numpy as np
prices = np.array([92,83,71,92,40,12,64])
volumes = np.array([100,251,4,12,709,34,75])
print(volumes)
names = ['Product1','Product2','Product3','Product4','Product5','Product6','Product7']
ind = np.lexsort((volumes*-1, prices))
print(ind)
for i in ind:
print(names[i], end=' ')
复数数组排序
按照实部的升序排列,对于实部相同的元素,参考虚部的升序,直接返回排序后的结果数组。
numpy.sort_complex(复数数组)
插入排序
若有需求需要向有序数组中插入元素,使数组依然有序,numpy提供了searchsorted方法查询并返回可插入位置数组。
indices = numpy.searchsorted(有序序列, 待插序列)
调用numpy提供了insert方法将待插序列中的元素,按照位置序列中的位置,插入到被插序列中,返回插入后的结果。
numpy.insert(被插序列, 位置序列, 待插序列)
案例:
import numpy as np
# 0 1 2 3 4 5 6
a = np.array([1, 2, 4, 5, 6, 8, 9])
b = np.array([7, 3])
c = np.searchsorted(a, b)
print(c)
d = np.insert(a, c, b)
print(d)
插值
scipy提供了常见的插值算法可以通过一组散点得到一个符合一定规律插值器函数。若我们给插值器函数更多的散点x坐标序列,该函数将会返回相应的y坐标序列。
func = si.interp1d(
离散水平坐标,
离散垂直坐标,
kind=插值算法(缺省为线性插值)
)
案例:
# scipy.interpolate
import scipy.interpolate as si
# 原始数据 11组数据
min_x = -50
max_x = 50
dis_x = np.linspace(min_x, max_x, 11)
dis_y = np.sinc(dis_x)
# 通过一系列的散点设计出符合一定规律插值器函数,使用线性插值(kind缺省值)
linear = si.interp1d(dis_x, dis_y)
lin_x = np.linspace(min_x, max_x, 200)
lin_y = linear(lin_x)
# 三次样条插值 (CUbic Spline Interpolation) 获得一条光滑曲线
cubic = si.interp1d(dis_x, dis_y, kind='cubic')
cub_x = np.linspace(min_x, max_x, 200)
cub_y = cubic(cub_x)
积分
直观地说,对于一个给定的正实值函数,在一个实数区间上的定积分可以理解为坐标平面上由曲线、直线以及轴围成的曲边梯形的面积值(一种确定的实数值)。
利用微元法认识什么是积分。
案例:
- 在[-5, 5]区间绘制二次函数y=2x2+3x+4的曲线:
import numpy as np
import matplotlib.pyplot as mp
import matplotlib.patches as mc
def f(x):
return 2 * x ** 2 + 3 * x + 4
a, b = -5, 5
x1 = np.linspace(a, b, 1001)
y1 = f(x1)
mp.figure('Integral', facecolor='lightgray')
mp.title('Integral', fontsize=20)
mp.xlabel('x', fontsize=14)
mp.ylabel('y', fontsize=14)
mp.tick_params(labelsize=10)
mp.grid(linestyle=':')
mp.plot(x1, y1, c='orangered', linewidth=6,label=r'$y=2x^2+3x+4$', zorder=0)
mp.legend()
mp.show()
- 微分法绘制函数在与x轴还有[-5, 5]所组成的闭合区域中的小梯形。
n = 50
x2 = np.linspace(a, b, n + 1)
y2 = f(x2)
area = 0
for i in range(n):
area += (y2[i] + y2[i + 1]) * (x2[i + 1] - x2[i]) / 2
print(area)
for i in range(n):
mp.gca().add_patch(mc.Polygon([
[x2[i], 0], [x2[i], y2[i]],
[x2[i + 1], y2[i + 1]], [x2[i + 1], 0]],
fc='deepskyblue', ec='dodgerblue',
alpha=0.5))
调用scipy.integrate模块的quad方法计算积分:
import scipy.integrate as si
# 利用quad求积分 给出函数f,积分下限与积分上限[a, b] 返回(积分值,最大误差)
area = si.quad(f, a, b)[0]
print(area)
金融相关
import numpy as np
# 终值 = np.fv(利率, 期数, 每期支付, 现值)
# 将1000元以1%的年利率存入银行5年,每年加存100元,
# 到期后本息合计多少钱?
fv = np.fv(0.01, 5, -100, -1000)
print(round(fv, 2))
# 现值 = np.pv(利率, 期数, 每期支付, 终值)
# 将多少钱以1%的年利率存入银行5年,每年加存100元,
# 到期后本息合计fv元?
pv = np.pv(0.01, 5, -100, fv)
print(pv)
# 净现值 = np.npv(利率, 现金流)
# 将1000元以1%的年利率存入银行5年,每年加存100元,
# 相当于一次性存入多少钱?
npv = np.npv(0.01, [
-1000, -100, -100, -100, -100, -100])
print(round(npv, 2))
fv = np.fv(0.01, 5, 0, npv)
print(round(fv, 2))
# 内部收益率 = np.irr(现金流)
# 将1000元存入银行5年,以后逐年提现100元、200元、
# 300元、400元、500元,银行利率达到多少,可在最后
# 一次提现后偿清全部本息,即净现值为0元?
irr = np.irr([-1000, 100, 200, 300, 400, 500])
print(round(irr, 2))
npv = np.npv(irr, [-1000, 100, 200, 300, 400, 500])
print(npv)
# 每期支付 = np.pmt(利率, 期数, 现值)
# 以1%的年利率从银行贷款1000元,分5年还清,
# 平均每年还多少钱?
pmt = np.pmt(0.01, 5, 1000)
print(round(pmt, 2))
# 期数 = np.nper(利率, 每期支付, 现值)
# 以1%的年利率从银行贷款1000元,平均每年还pmt元,
# 多少年还清?
nper = np.nper(0.01, pmt, 1000)
print(int(nper))
# 利率 = np.rate(期数, 每期支付, 现值, 终值)
# 从银行贷款1000元,平均每年还pmt元,nper年还清,
# 年利率多少?
rate = np.rate(nper, pmt, 1000, 0)
print(round(rate, 2))