Project-3:基于堆和循环桶实现 djikstra 算法
-
实验原理
-
堆: 堆是一种经过排序的完全二叉树,其中任一非终端节点的数据值均不大于(或不小于)其左子节点和右子节点的值。最大堆和最小堆是二叉堆的两种形式。最大堆:根结点的键值是所有堆结点键值中最大者。最小堆:根结点的键值是所有堆结点键值中最小者。本次实验使用的是最小堆;
-
桶:桶排序定义:桶排序 (Bucketsort)或所谓的箱排序,是一个排序算法,工作的原理是将数组分到有限数量的桶子里。每个桶子再个别排序(有可能再使用别的排序算法或是以递归方式继续使用桶排序进行排序)。桶排序是鸽巢排序的一种归纳结果。当要被排序的数组内的数值是均匀分配的时候,桶排序使用线性时间(Θ(n))。但桶排序并不是 比较排序,他不受到 O(n log n) 下限的影响。【来源:百度百科】
可见桶排序的效率非常的高,因此利用桶来寻找最短距离标记有利于提升dijkstra算法的运行效率,不过在真正实现算法的过程中实现的桶与真正的桶排序有一定的区别,这是由dijkstra算法的特点所决定的:即每个桶里面的元素距离标记都一样;但是要保证这一点需要的是在每永久标记一次(即节点出桶一次)桶的头部要往后一个桶移动,并且要保证改变桶的头部之前要把头部桶里面的所有节点清除出桶,然后永久标记;
重复节点如何解决:其实不需要解决,因为第一次永久标记的节点距离标记肯定是最短的,因此在考虑标记某个节点的时候只要判断其是否在永久标记集合当中,如果在,则丢弃不再标记,否者永久标记;或者在把节点加入桶中的时候先判断该节点是否已经永久标记,已经标记就不再加入桶中;
桶的具体实现和封装: 所有的桶结构用一个list()来维持,而一个桶也是一个list(),BucketSet类封装了需要的桶结构,并且对外暴露add(_)和pop_min()方法用于添加元素到桶中和将最小距离标记节点取出,pop_min()方法返回最小距离标记的节点集合;Bucket类是BucketSet类的私有内部类,提供add()和pop_min() 方法功能与BucketSet的两个方法类似,具体更多实现细节见代码注释;
-
-
利用最小堆实现dijkstra算法
(1) 建立最小堆,首先把源节点加入堆中对堆初始化;
(2) 弹出最小距离标记节点;
(3) 将最小距离标记节点永久标记;
(4) 将永久标记节点的所有邻接节点入堆;
(5) 重复2-4操作,直到堆空,算法结束;- 代码实现
from time import time from heapq import heapify, heappop, heappush def heap_dijkstra(graph, s): """ 堆实现迪杰斯特拉算法 :param graph: 图 :param s: 源节点 :return: 各节点距离标记字典 """ node_s = (0, str(s)) # 源节点实体二元组(距离标记,节点) min_heap = list() # 用于建堆的列表 visited = {} # 永久标记节点集合 min_heap.append(node_s) heapify(min_heap) # 初始化最小堆 path_length = {} # 距离标记字典 while min_heap: # 堆非空 min_node = heappop(min_heap) # 取最小距离标记节点 if visited.get(min_node[1]) is None: # 该节点没有永久标记 visited[min_node[1]] = 1 # 永久标记该节点 path_length[min_node[1]] = min_node[0] # 记下该节点距离标记 node_neighbor_edges = graph.get_node_edges(min_node[1]) # 将所有邻接节点入堆 for e in node_neighbor_edges: heappush(min_heap, (e[0]+min_node[0], e[2])) return path_length
- 代码实现
-
利用循环桶实现dijkstra算法
(1) 初始化桶,把源节点加入到桶中;
(2) 弹出最小距离标记节点;
(3) 将最小距离标记节点永久标记;
(4) 将永久标记节点的所有邻接节点入桶;
(5) 重复2-4操作,直到所有桶空,算法结束;- 代码实现
from time import time from BucketSet import BucketSet def bucket_dijkstra(graph, s, max_edge_weight): """ 桶实现的迪杰斯特拉算法 :param graph: 图 :param s: 源节点 :param max_edge_weight: 最大边权 :return: 距离字典 """ buckets = BucketSet(max_edge_weight+1) # 创建循环桶对象 node_s = (0, str(s)) # 源节点二元组(距离标记,节点) visited = {} # 永久标记集合key是标记节点,所有value是1 buckets.add_thing(node_s) # 添加源节点到桶 path_length = {} # 路径长度字典 while not buckets.is_empty(): # 所有的桶未空 min_list = buckets.pop_min() # 取最小距离标记集合 返回列表 while not len(min_list) == 0: min_node = min_list.pop() if visited.get(min_node[1]) is None: # 该节点没有永久标记 visited[min_node[1]] = 1 # 永久标记该节点 path_length[min_node[1]] = min_node[0] # 记下路径长度 node_neighbor_edges = graph.get_node_edges(min_node[1]) # 将所有邻接节点加入桶中 for e in node_neighbor_edges: buckets.add_thing((e[0]+min_node[0], e[2])) return path_length # 返回各点距离字典
from copy import deepcopy from time import time class BucketSet(object): """ 循环桶类 """ __header_location = 0 # 桶头位置 __thing_amount = 0 # 所有桶中节点数量 def __init__(self, bucket_num): self.__bucket_num = bucket_num # 桶数量 self.__buckets = [None]*bucket_num # 建桶 self.__init_buckets(bucket_num) # 初始化桶 self.__header = self.__buckets[0] # 桶头是第一个桶 def __move_header_to_next(self): """ 头节点下移一个位置 :return: None """ self.__header_location = (self.__header_location+1) % self.__bucket_num self.__header = self.__buckets[self.__header_location] def __init_buckets(self, bucket_num): """ 初始化桶 :param bucket_num: 桶数量 :return: None """ for i in range(bucket_num): self.__buckets[i] = BucketSet.__Bucket(i) def __hash(self, length): """ 根据距离标记计算哈希值,决定放入的桶 :param length: 距离标记 :return: 返回桶编号 """ return length % self.__bucket_num def add_thing(self, thing): """ 节点添加到桶中 :param thing: :return: """ length = thing[0] # 距离标记 bucket_id = self.__hash(length) # 计算桶编号 self.__buckets[bucket_id].add_thing(thing) # 放入桶中 self.__thing_amount += 1 # 更新节点数量 def is_empty(self): """ 判断循环桶是否空 :return: Boolean 所有桶空返回TRUE,否者FALSE """ return self.__thing_amount == 0 def pop_min(self): """ 取最小距离标记的的节点 :return: 返回距离标记最小的桶的集合 """ while self.__header.is_empty(): # 该桶空就往后查询 self.__move_header_to_next() min_things = self.__header.pop() # 取桶中所有节点 self.__thing_amount = self.__thing_amount - len(min_things) # 更新循环桶中节点数量 self.__move_header_to_next() # 头桶往后移动 return min_things.copy() class __Bucket(object): """ 桶类 """ __thing_amount = 0 # 桶中的节点 def __init__(self, bucket_id): """ 初始化 :param bucket_id: 桶id """ self.list_thing = list() # 节点容器list self.id = bucket_id def add_thing(self, thing): """ 往桶里放节点 :param thing: :return: """ self.list_thing.append(thing) self.__thing_amount += 1 # 更新节点数量 def pop(self): """ 取出桶内节点 :return: 返回节点集合 """ things = self.list_thing.copy() self.list_thing.clear() self.__thing_amount = 0 # 桶内节点数量更新为0 return things def is_empty(self): """ 判断桶是否为空 :return: 桶空返回true,否则返回FALSE """ return self.__thing_amount == 0
-
测试数据
-
性能对比
- 点数:2000-58000 边数:3000—87000
- 点数:2000-80000;边数:4000-160000
- 点数:2000-58000 边数:3000—87000
-
结论
1.经过多组实验数据的对比来看,用桶来实现dijkstra算法的性能总体比堆实现要好一些;
2.两种算法的运行时间与节点数几乎成线性关系,与理论值很接近; -
微信公众号