二分查找python实现

 
res_list = []
i=0
def halfSearch (func, left, right, x): 
    global i
    i = i+1
    
    if right > left: 
#         print(i,left,right)
        val = func(left, right) 
        if val < x:
            res_list.append([left, right])
        else:
            
            mid = int(left + (right - left)/2)

            if func(left, mid) <= x: 
                res_list.append([left, mid])
            else:
                halfSearch(func, left, mid, x) 
                
            if func(mid, right) <= x: 
                res_list.append([mid, right])
            else:
                halfSearch(func, mid+1, right, x) 
    else: 
        # 不存在
        return -    # 基本判断1
    
区间个数-造数

背景

 
账户表只有账户有索引,但不连续,并且跨度极大 
要用这个索引,但一次不能查询太多数据,会对线上造成压力 

现用二分查找确定一个批次的区间边界

场景数据模拟

 
import random 
from ai.box.db import DbConnect


class Sql():

    create_acc="""
    CREATE TABLE if not exists acc(
        id bigint(11)    NOT NULL AUTO_INCREMENT primary key COMMENT '主键',
        c_acc bigint(19),
        c_time datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '提交时间',
        index(c_acc));
    """
    
    
class MysqlInit(DbConnect):

    def __init__(self) -> None:
        super().__init__(db_file="/opt/tpf/aiwks/code/aisty/ai/box/db.db")
        
    def create_acc(self):
        """创建记录表
        """
        with self.mysql() as connection:
            cursor = connection.cursor()
            cursor.execute(Sql.create_acc)
            connection.commit()
            cursor.close()
    

    def gen_data(self):
        acc = 1
        for i in range(100000):
            add = random.randint(10,100000000) 
            acc = i + add
            sql = "insert into acc(c_acc)values({})".format(acc)
            with self.mysql() as connection:
                cursor = connection.cursor()
                cursor.execute(sql)
                connection.commit()
                cursor.close()
            
mi = MysqlInit()
mi.create_acc()
mi.gen_data()     
    
区间个数-二分计算

 
import pandas as pd 
from ai.box.db import DbConnect

class Sql():
    acc_count = """select count(c_acc) 
        from db1.acc 
        where c_acc>{} and c_acc<={}"""
    
    acc_check = "select count(c_acc) from db1.acc where c_acc>1 and c_acc<=100000000"
    
res_list = []
i=0
def halfSearch (func, left, right, x): 
    global i
    # 基本判断
    i = i+1
    
    if right > left: 
#         print(i,left,right)
        val = func(left, right) 
        if val < x:
            res_list.append([left, right, val])
        else:
            
            mid = int(left + (right - left)/2)

            val1 = func(left, mid) 
            if val1 <= x: 
                res_list.append([left, mid, val1])
            else:
                halfSearch(func, left, mid, x) 
            
            val2 = func(mid, right)
            if val2 <= x: 
                res_list.append([mid, right, val2])
            else:
                halfSearch(func, mid+1, right, x) 
    else: 
        # 不存在
        return -1

    

class MysqlInit(DbConnect):
    """mysql创库后初始化操作
    """
    def __init__(self) -> None:
        super().__init__(db_file="/opt/tpf/aiwks/code/aisty/ai/box/db.db")


    def db_count(self,begin,end):
        sql = Sql.acc_count.format(begin,end)
#         print(sql)
        with self.mysql() as connection:
            cursor = connection.cursor()
            cursor.execute(sql)
            col = [c[0] for c in cursor.description]
            res = cursor.fetchall()
            data = pd.DataFrame(res,columns=col) 
            cursor.close()
        return data.iloc[0][0]
    
    def count(self, sql):
        with self.mysql() as connection:
            cursor = connection.cursor()
            cursor.execute(sql)
            col = [c[0] for c in cursor.description]
            res = cursor.fetchall()
            data = pd.DataFrame(res,columns=col) 
            cursor.close()
        return data.iloc[0][0]
        

mi = MysqlInit()
left=1
right=100000000
halfSearch (mi.db_count, left=left, right=right, x=5000)

res_list
[[1, 25000000, 3299],
 [25000000, 50000000, 3239],
 [50000001, 75000000, 3444],
 [75000000, 100000000, 3276]]

区间个数-验证

 
data=pd.DataFrame(res_list,columns=["A","B","C"])
print("二分计算累计:",data.C.sum())
print("数据库实际行数:",mi.count(Sql.acc_check))

二分计算累计: 13258
数据库实际行数: 13258

区间个数-优化

 
res_list
[[1, 25000000, 3299],
[25000000, 50000000, 3239],
[50000001, 75000000, 3444],
[75000000, 100000000, 3276]]
    
有的下一个的left等于上一个的right
有的则是上一个right+1 
如果是+1,则会遗漏一个数据 

 
res_list = []
i=0
def halfSearch (func, left, right, x): 
    global i
    # 基本判断
    i = i+1
    
    if right > left: 
#         print(i,left,right)
        val = func(left, right) 
        if val < x:
            res_list.append([left, right, val])
        else:
            
            mid = int(left + (right - left)/2)

            val1 = func(left, mid) 
            if val1 <= x: 
                res_list.append([left, mid, val1])
            else:
                halfSearch(func, left, mid, x) 
            
            val2 = func(mid, right)
            if val2 <= x: 
                res_list.append([mid, right, val2])
            else:
                halfSearch(func, mid, right, x) 
    else: 
        # 不存在
        return -1

左开右闭的区间

 
left=1
right=100000000
halfSearch (mi.db_count, left=left, right=right, x=1000)

res_list
[[1, 6250000, 796],
[6250000, 12500000, 811],
[12500000, 18750000, 846],
[18750000, 25000000, 846],
[25000000, 31250000, 776],
[31250000, 37500000, 810],
[37500000, 43750000, 799],
[43750000, 50000000, 854],
[50000000, 56250000, 879],
[56250000, 62500000, 848],
[62500000, 68750000, 902],
[68750000, 75000000, 815],
[75000000, 81250000, 785],
[81250000, 87500000, 889],
[87500000, 93750000, 817],
[93750000, 100000000, 785]]

每个区间都是左开右闭,这样,所有区间就无缝连接起来了
也看具体业务需求,不要求这个,就无所谓 

参考