import numpy as np
a=np.arange(10)
print(a)
b=np.array([2,4,3,5,2,1,4,5,3,2])
c=MA(a,b)
要求输出:
[nan nan 1. nan 3.5 5. 4.5 5. 7. 8.5]
也就是说,数组每个点的算术平均所使用的参数都可能是不同的.我现在用的办法是 map,速度很慢,想破脑袋也想不到更好的办法,不知道有没有什么高效的实现办法?
1
jmc891205 2018-04-06 11:11:15 +08:00
算术平均不是求和除以个数吗?
你这个结果是什么 看不懂 |
2
vegito2002 2018-04-06 11:13:41 +08:00 via iPad
看不懂问题
|
3
ericls 2018-04-06 11:23:14 +08:00 via iPhone
我估计 MA 是 moving average 吧…… ??
|
4
noe132 2018-04-06 11:56:29 +08:00 via Android
看不懂问题+1
|
5
wizardforcel 2018-04-06 12:09:50 +08:00
MA ?? np.convolve 了解一下。。
|
6
kysida 2018-04-06 12:21:54 +08:00
试试 numpy.mean()???
|
7
RecursiveG 2018-04-06 15:37:07 +08:00
楼主的表达能力堪忧啊。
我估计楼主是想要实现 `c[i]=avg(a[i-b[i]+1:i+1]) if i-b[i]+1>=0 else NaN` 不担心数字太小的话可以先算 a 的前缀和。 |
8
RecursiveG 2018-04-06 15:38:02 +08:00
更正:不担心数字太大的话可以先算 a 的前缀和
|
9
Kirscheis 2018-04-06 16:49:16 +08:00 via Android 1
论 dp 在计算机编程中的经典应用
|
10
dwjgwsm OP 抱歉,是我大意了,我以为都能看懂呢,MA 是我自己写的移动平均函数,用的 map 遍历.我想来想去觉得想找更好算法这个问题无解. 卷积,我也想过,问题是每个元素的权重都不同吧. 我没有信心了,想弃楼了
|
13
Kirscheis 2018-04-06 22:19:07 +08:00
我靠,出去玩了一天你这个帖子竟然还没有人给实现。。。看来你的表达 v 友估计都没看懂
这就是 dp 的例题啊老哥 首先假设你的 a,b 长度都是 n。如果不是,rightpad b。O(1) 在 a 后面 append INFINITY,然后把 a 变成循环数组,并且以下所说的数组都是循环数组。O(1) b 实际上是区间集,把它转换成左右指标的集 c。O(n) d = sorted(c)。O(nlogn) 初始化二维 array dp[2n][2n]。O(1) //求 dp,O(n) for i in range(len(c)): dp [ d[i] ] [ d[i+1] ] = sum( a [d[i], d[i+1]] ) 最后,把区间碎片的和拼接成所需的和,存进 e。worst case O(n) ans = e ./ b,./是 element-wise division。O(n) 最后检查 ans,把所有 INFINITY 替换成 NaN。O(n) 总复杂度 O(nlogn),和排序差不多 |
14
dwjgwsm OP @Kirscheis 对的,a b 数组长度相等, 我再描述一下吧,我们计算平均值,都有个参数 n:n 个数字的平均值, 在这里,数组 b=np.array([2,4,3,5,2,1,4,5,3,2])的每个元素都是对应位置的 n ,比如返回数组是 result_arr,则:
result_arr[9]=(a[8]+a[9])/b[9] #b[9]==2,所以是 2 个元素的平均值 result_arr[8]=(a[6]+a[7]+a[8])/b[8] #b[8]==3,所以是 3 个元素的平均值 不过大哥,你说的我表示完全看不懂啊. 你说"这是 dp 的例题",出自哪里啊? |
15
jmc891205 2018-04-07 03:54:36 +08:00 via iPhone
@dwjgwsm 懂你的题目了
一种做法是可以先用数组 a 构建一棵线段树 然后遍历 b 去线段树上查询你需要的区间的和 |
16
aijam 2018-04-07 05:15:58 +08:00
quick and dirty: [np.mean(a[i - size + 1: i+1]) if i - size + 1 >= 0 else np.NaN for i, size in enumerate(b)]
|
19
wingkou 2018-04-07 10:02:51 +08:00 via Android
|
20
Kirscheis 2018-04-07 10:05:25 +08:00
@dwjgwsm 晕了,不知道你有没有算法背景,我上面讲的都是一些很简单的概念啊。。建议你再去随便找本算法书看看 dynamic programming 的章节。。
我再倒着给你讲一遍好了 你要的最后的结果是一个列表,而你的 b 列表里的元素实际上就是除因子,所以这个问题本质是求 a 列表中一些不同长度和起始点的区间的和。你的 b 列表给出了这些区间的起始范围,所以可以转化成坐标对的形式。直接对这些坐标排序,实际上是裂解了你要求的那些区间 举例,比如你想求一个列表在这些区间的和:(2, 10), (5, 7), (3, 16), (8, 23) 对坐标排序给出 ((2,3,5,7,8,10,16,23)) 依次求出上面这些小区间的局部和,并且存在一个表格里,那么将来要用的时候就不用反复求和了。这一步操作只需要线性时间 那么就有 sum(2, 10) = sum(2, 3)+ sum(3, 5)+ sum(5, 7)+ sum(7, 8) + sum(8, 10) 其它区间类似 最后把这些区间和的每一项除以 b 列表的对应元素 (element-wise division),就是你要的那些平均值了,这一步也是线性时间的 以上这些算法我设计的时候都考虑了并行优化,也就是说它们都是可以直接 GPU/FPGA 加速的。如果你的数据集真的很大,这个算法的耗时和快排是基本一样的 这样讲你能明白吗。。再不明白我就没办法了😂 至于为什么要在 a 后面 append 一个 INFINITY,再把 a 变成循环数组,这是因为你的区间有可能会 index out of range,这样干了之后任何 index out of range 的区间的局部和都是 INFINITY,求平均之后还是 INFINITY。因此,你最后检查一遍结果,如果发现 INFINITY,就知道这个元素对应的区间 index out of range 了,于是就把它换成 NaN。这是一个设计算法的时候常用的技巧,在具体实现的时候,把 INFINITY 设置成一个很大的常数,比如在 64 位机上 0xEFFFFFFFFFFFFFFF,规定这个数附近的数都是 INFINITY 就可以了 |
21
Kirscheis 2018-04-07 10:12:33 +08:00
@dwjgwsm 我上面说的办法实际上就是线段树稍微改了一下,如果你求和不需要并行加速,那直接用前缀和是最简单的,直接利用 sun(i, j) = sa(j) - sa(i) 的性质就好了
老哥你该复习一下算法了。。 |
22
wingkou 2018-04-07 10:25:00 +08:00
@Kirscheis 刚开始没怎么看你的 dp😂,后来看了下,感觉有点奇怪,dp 不是讲状态转移的吗?您的状态转移方程是
`dp [d[i]] [d[i+1]]=sum(a[d[i]:d[i+1]])`? 感觉不是状态转移吧?数组 dp 只在等式左侧出现。感觉只是个优化吧? 另外前缀和除了预处理之外,后面的能并行。 |
23
Kirscheis 2018-04-07 10:35:22 +08:00
@wingkou 是的,这只是分治,一开始想写 dp 的,后来想了想不好并行改了,不过那个是我昨晚喝了酒回来写的,又赶着打游戏。。写得很乱
楼主的题目有点特殊,因为他的区间右端点实际上把集合完全分成单个元素了,所以求和的求和也会有大量重合,应该加上 dp[i][k] = dp[i][j] + dp[j][k]的 不好意思,变量名乱写误导了 |
24
wingkou 2018-04-07 10:47:28 +08:00
@Kirscheis 啊,对耶!“他的区间右端点实际上把集合完全分成单个元素了”,所以你的`d = sorted(c)`去重之后一定是 range(len(a))😂算是负优化了😂刚刚还没注意到这点。
|
25
ipwx 2018-04-07 11:37:22 +08:00
@Kirscheis 老哥,这个问题的另一个难点是如何用 Python 高效地实现楼主的需求。
这么说吧,用 Python 裸写一个 for (n): c[i] = a[i] + b[i] 要比 NumPy 写 c = a + b 至少慢 20 倍。没办法,NumPy 用 C 写的,而且有些操作还有 Intel SIMD 指令集加成,比不过的。 NumPy 的基本操作大致有:任意维张量的(每个对应元素)加减乘除、比较(判等和大小,输出布尔向量),布尔向量当做整形向量参与运算,任意维两个张量后两维、前两维的点积(这个 carefully 优化过,相信是考虑过指令集和 cache line 之类的各种问题的)。 由于这个限制,Python + NumPy 写程序的时候通常会“多做一些运算”,以求更短的执行时间。比如: flag = (a > b) c = flag * a + (1 - flag) * b 换成 C 语言你相信这个比 for 循环更快? - - - - 昨天我就看到这个问题了,但是恕我愚钝,我想不出利用 NumPy 在 Python 里面优雅地解决这个问题的方案。 |
27
RecursiveG 2018-04-07 12:47:45 +08:00
希望楼主解释一下你的“用 map 一个子函数来实现的”具体是怎么实现的,至少我没看出来。
然后你算法的时间复杂度是多少?你期望的算法时间复杂度是多少?你的数据量有多大? 是只需要算法优化,还是需要考虑别的因素?(并行 /GPU etc.) 普通算法有前缀和 O(n)或者线段树 O(nlogn)(本质都是区间和问题) @Kirscheis 有 O(nlogn)的可以并行的算法( taken as-is ) 或者直接根据 b 数组构造一个 n*n 的矩阵 Q 使得 c=aQ 然后用矩阵乘法。 |
28
dwjgwsm OP @RecursiveG 我先把我写的函数放上来吧,当时我觉得没必要贴一坨代码.刚从外面回来,楼上的各位回复还没仔细看.贴完了,在慢慢看
|
29
dwjgwsm OP def MA(npdata,narr): #narr 是一个和 npdata 等长的 ndarray 数组
L=len(npdata) j=np.arange(1,L+1) #当时还不知道有 enumerate 这个东西,所以传了一个定位数组进去(从 aijam 那儿学了一招,谢谢!) def IMA(n, k): if k < n or n<0 or np.isnan(n): return np.nan return np.mean(npdata[k - int(n):k]) return np.array(list(map(IMA, narr, j))) 其实就是和 aijam 一样的遍历算法. 我待会儿试试用列表推导和 map 哪个快,之前网上说应该是 map 快. 之前没有装 cython.昨天买了一个大硬盘重装了系统,把 cython 装上去了(因为 vs 的缘故).准备上 cython 看看.回头报告结果 |
30
dwjgwsm OP @RecursiveG 数据长度几百,但是调用频率特别高,还有其他十几个类似的函数,所以总体运行下来慢的要死,必须想办法从各种角度优化
|
31
jmc891205 2018-04-07 23:31:11 +08:00
纠结列表推导和 map 哪个快毫无意义
就算上了 C,前缀和和线段树也是比遍历快的多的算法 |
32
dwjgwsm OP @Kirscheis 谢谢你讲的这么详细,我不是科班出身啊,没学过这些算法.不过大家说的前缀和,线段树,dp,我网上大致看了一下.大体的思路就是把数据分割小,避免重复计算.回头去买本算法的书学一下吧.我觉得作用可能不会很大,因为我的情况不是数据长度很大. 先说一下测试结果:
测试了长度为 10 万的数据, 1.在 python 中,map 和列表推导速度基本相同,大概是 2 秒 2.在参数检查中 not np.isnan()会将速度降低 40%左右,这是之前没注意到的 3.对比 cython 和 python,cython 列表推导只比 python 快 12%(想试一下 map,结果还不知道怎么实现,cython 中好像无法实现子函数,编译报错) |
33
ipwx 2018-04-08 09:49:42 +08:00
@dwjgwsm
1、Cython 可以写函数。 2、Cython 最好不要用 map、列表推导之类的,直接用 for。 3、Cython 性能提升的关键是用上 C 一样的指针或者数组直接读写,不要用 Python 对象。 4、Cython 支持对 NumPy 数组进行直接读写。当然,不是直接用 NumPy 数组对象,你得查文档。 |
34
dwjgwsm OP @ipwx 试过了,基本上看不出提升效果.
import numpy as np cimport numpy as np cimport cython np.seterr(invalid='ignore') cdef int CMAX(int x,int y): return x if x>y else y cdef int CMIN(int x,int y): return y if x>y else x DTYPE1 = np.float DTYPE2 = np.int ctypedef np.float_t DTYPE1_t ctypedef np.int_t DTYPE2_t @cython.boundscheck(False) @cython.wraparound(False) def MA(np.ndarray[DTYPE1_t, ndim=1] npdata,np.ndarray[DTYPE2_t, ndim=1] n): cdef int L=npdata.shape[0] cdef int i=0 cdef np.ndarray res=np.zeros(L) for i from 0<=i<L: try: res[i]=np.mean(npdata[i + 1 - n[i]: i + 1]) except: res[i] =np.nan return res |
35
jmc891205 2018-04-08 22:45:36 +08:00 via iPhone
@dwjgwsm 你对每一个位置都用 np.mean 来求平均值 最坏情况下要做多少次多余的加法你知道不?
|
36
ruoyu0088 2018-04-10 07:09:34 +08:00
用一个累加数组,可以提高计算速度:
import numpy as np a=np.arange(10) b=np.array([2,4,3,5,2,1,4,5,3,2]) ac = np.r_[0, np.cumsum(a)] be = np.arange(1, b.shape[0] + 1) bs = be - b res = (ac[be] - ac[bs]) / b res[bs<0] = np.nan |