文件路径

文件绝对路径

 
import os

file_name = 'a.conf'
absolute_path = os.path.abspath(file_name)
print(absolute_path)   #/opt/tpf/aitpf/cmd/a.conf

文件编码格式

 
import chardet

file_path="E:\\tpf\\aitpf\\bigmodel\\agent\\prompts\\tools\\excel_analyser.txt"

# 检测文件编码格式
def detect_encoding(file_path):
    with open(file_path, 'rb') as f:
        result = chardet.detect(f.read())
    return result['encoding']

# file_path = 'your_file_path'
encoding = detect_encoding(file_path)
print("encoding:",encoding)

# 使用检测到的编码格式打开文件
with open(file_path, 'r', encoding=encoding) as f:
    content = f.read()
    
print(content)
    
    

 

    

 

    

 

    

 


 

  

 


python读取文本

 
file = open('a.txt','w')
#向文件中输入字符串
file.write('a\n')
file.write('b\n')
file.write('c\n')
file.close()  

f=open('a.txt','r')
#读取所有内容
f.read()  #'a\nb\nc\n'
f.close()

 
a+ 追加 
w+ 可读可写,写覆盖 
w 写覆盖
r 只读,文件不存在时报错

open(file_name,"r","utf-8")

文件读写方法

 
f.read()       读全部内容 

f.readline()   逐行读

f.readlines()  所有行

f.write()         写 

f.writelines()   写列表

python按行读取文本

 
#-- coding: UTF-8 --

from ai.params import csv_path1

# 方式一
for line in open(csv_path1):
    print(line)

# 方式二
with open(csv_path1) as f:
    line = f.readline() 
    while line:
        print(line, end = '\n') 
        line = f.readline()

# 方式三
with open(csv_path1) as f:
    lines = f.readlines(10) 
    if lines :
        for line in lines:
            print(line)

utf8,utf-8,UTF8,UTF-8都可以

 
cat a.txt 
a
with open(file="a.txt",mode='r',encoding='utf8') as f:
    line = f.readline()
    print(line)
    line = f.readline()
    print(f"--{line}--")
    print(line.strip()) #什么也没有,在python中,空行也可以.处理方法而不报错

python读取文件大小

单位:字节

 
fpath = "aa.log"

import os
stt = os.stat(fpath)
print(stt.st_size) #5

 
from pathlib import Path
f = Path(fpath)
size = f.stat().st_size
print(size) #5

 
import os
size = os.path.getsize(fpath)
print(size) #5

读目录下所有文本并批次处理

 
import os 

def to_mysql(value_list):
    print(value_list)
    
    
base_dir="need_todbc"
# 获取当前目录下的所有文件
files = [os.path.join(base_dir, file) for file in os.listdir(base_dir)]

value_list_tmp = []
row_num = 0
max_count_todb = 7

for file in files: 
    if file.endswith(".txt"):  
        with open(file,mode='r',encoding='utf8') as f:
            line = f.readline() 
            while line:
                value_list_tmp.append(line.strip())
                line = f.readline()
                row_num = row_num+1
                if row_num>=max_count_todb:
                    row_num = 0
                    to_mysql(value_list_tmp)
                    value_list_tmp=[]
                    
if len(value_list_tmp)>0:
    to_mysql(value_list_tmp)
    value_list_tmp=[]
    

批次处理,并可中断可反复读取

 
import os 

def deal_func(value_list):
    print(value_list)
    
    
def batch_read(base_dir="need_todbc", save_file = "readed_file.txt", batch_size = 5):
    # 获取当前目录下的所有文件
    files = [os.path.join(base_dir, file) for file in os.listdir(base_dir)]

    value_list_tmp = []
    value_list_old =[]
    row_num = 0
    
    if os.path.exists(save_file):
        readed_file = open(save_file,'r')
        value_list_old = readed_file.readlines()
        readed_file.close()
        line_num = 0
        for line in value_list_old:
            value_list_old[line_num] = line.strip()
            line_num = line_num+1

    write_file = open(save_file,'a+')
    for file in files: 
        if file.endswith(".txt") and (file not in value_list_old):  
            with open(file,mode='r',encoding='utf8') as f:
                line = f.readline() 
                while line:
                    value_list_tmp.append(line.strip())
                    line = f.readline()
                    row_num = row_num+1
                    if row_num>=batch_size:
                        row_num = 0
                        deal_func(value_list_tmp)
                        value_list_tmp=[]
            if len(value_list_tmp)>0:
                deal_func(value_list_tmp)
                value_list_tmp=[]
            value_list_old.append(file)#文件写完后保存一下 
            write_file.write(f"{file}\n")
    write_file.close() 
    
batch_read(base_dir="need_todbc", save_file = "readed_file.txt")

读取文本最后n行

 
import os 

def last_line(filename, last_n = 2):
    """读取一个文件的最后n行,前去除前后空格 
    fp.seek(offset, 2):
    - where=0,1,2分别表示从文件头,当前指针位置,文件尾偏移,缺省值为0
        - where=2,文件打开的方式必须是二进制打开,即使用'rb'模式
    
    """
    try:
        filesize = os.path.getsize(filename)
        if filesize == 0:
            return None
        else:
            with open(filename, 'rb') as fp: # to use seek from end, must use mode 'rb'
                offset = -16 
                while -offset < filesize:   # 
                    fp.seek(offset, 2)      #
                    lines = fp.readlines()  #
                    if len(lines) >= last_n:     
                        res = lines[-last_n:]
                        return ([(v.decode("utf8")).strip()  for  v in res])  #
                    else:
                        offset *= 2         
                fp.seek(0)  # 边界条件的处理,只有一行时 
                lines = fp.readlines()
                return ((lines[-1]).decode('utf8')).strip()
    except FileNotFoundError:
        print(filename + ' not found!')
        return None

python字符串转字典

strip可去除首尾的字符

 
$ cat /opt/tpf/aiwks/datasets/text/a.cvs
{aa=1,bb=2}


from ai.params import csv_path1

with open(csv_path1) as f:
    lines = f.readlines(64) 
    if lines :
        for line in lines:
            kv = line.replace(" ","").strip('{').strip('}').split(",")
            row = {}
            for elem in kv:
                tmp = elem.split("=")
                row[tmp[0]] = tmp[1]
            print(row)

python读取压缩文件

pandas读取csv zip压缩文件

 
import zipfile
import pandas as  pd

zp = zipfile.ZipFile("data/aa.zip", 'r')
names = zp.namelist()
f = zp.open(names[0])
data = pd.read_csv(f)
f.close()

python读取tar.gz文件

 
import tarfile
import pandas as  pd
tar = tarfile.open('data/aa.tar.gz')
name = tar.getnames()         #获取被压缩文件的名字,list形式
tar.extractall('./tmp')       #解压后文件存放的路径
df = pd.read_csv('./tmp/' + name[0])   

python读取zip压缩文件

 
import zipfile
import pandas as  pd
    
class ZipReader(object):
    '''python读取zip压缩文件
    '''
    def __init__(self, zip_path):
        '''
        :param zip_path: zip文件路径
        '''
        self.zip = zipfile.ZipFile(zip_path, 'r')  # 创建一个zipfile
    
    def get_filecount(self):
        '''
        :return: 返回压缩包里面的文件个数
        '''
        return len(self.zip.namelist())
    
    def get_files(self):
        '''generator ,每次返回一个文件的内容
        '''
        for name in self.zip.namelist():
            yield self.read_lines(name)  # 生成器
    
    def read_lines(self, name):
        '''list列表,每个元素为一行
        '''
        return [line.decode() for line in self.zip.open(name).readlines()]
    
    
    def get_filenames(self):
        '''zip文件里面的所有文件名
        '''
        return self.zip.namelist()
    
    def extract_to(self, path):
        '''解压路径
        '''
        self.zip.extractall(path)
        return path
    
    def read_cvs(self, name):
        '''读取单个csv文件并转为pandas
        '''
        f = self.zip.open(name)
        data = pd.read_csv(f)
        f.close()
        return data
    
    def read_csvs(self):
        '''generator,每次返回一个csv文件的内容
        '''
        for name in self.zip.namelist():
            yield self.read_cvs(name)  # 生成器

if __name__ == "__main__":
    zp = ZipReader(zip_path="data/aa.zip")
    print("文件个数:", zp.get_filecount())
    print("文件名列表:", zp.zip.namelist())
    print("文件内容(所有文件):", str(list(zp.get_files())))
    print("解压路径:", zp.extract_to("./test"))
    for data in zp.read_csvs():
        print(type(data))  # class 'pandas.core.frame.DataFrame'
        break

python统计文件个数

统计指定目录下特定格式文件的个数

 
import datetime
import os

def file_counts(filePath="/tmp/logs", nearent_days = 7, expect_count=7):
    """统计最近几天生成文件个数,文件名称以日期开头 
    """
    file_num = 0
    for i in range(1, nearent_days+1):
        day = datetime.datetime.now() - datetime.timedelta(days=i)
        day = day.strftime('%Y-%m-%d')
        day_num = 0

        # os.listdir 读取出当前文件夹下的文件夹和文件
        for file in os.listdir(filePath): 
            if file.endswith(".zip") and file.startswith(day):  
                file_num += 1 
                day_num += 1
        if day_num != expect_count :
            print(day,day_num)
    print(f'最近{nearent_days}天文件总数为:{file_num}个--------')

file_counts()

python字符串包含

in or not in

 
"go" in "good"       # True 
"bad" not in "good"  # True 

删除最后一个字符

使用切片

 
a="a,a,a,"
a[:-1]  # 'a,a,a'

python追加100W行数据

writelines

 
import time
start_time = time.time() # 记录程序开始时间

for i in  range(1000):
    with open('example.txt', 'w') as file:
        lines = []
        for i in  range(1000):
            lines.append('{}'.format(i))

        file.writelines(lines)

end_time = time.time()  # 记录程序结束时间
tim_ms = round((end_time - start_time)*1000,2)

print("程序运行时间为", tim_ms, "毫秒")

 
程序运行时间为 2629.4 毫秒

 
import time
start_time = time.time() # 记录程序开始时间

for i in  range(10000):
    with open('example2.txt', 'w') as file:
        file.write('{}'.format(i))

end_time = time.time()  # 记录程序结束时间
tim_ms = round((end_time - start_time)*1000,2)

print("程序运行时间为", tim_ms, "毫秒")

 
程序运行时间为 21376.61 毫秒

这是1W次循环的时间,下面还测试一下无打开关闭的时间 

 
import time

file = open('example3.txt', 'a')
start_time = time.time() # 记录程序开始时间

for i in  range(1000000):
    file.write('{}'.format(i))
file.close()

end_time = time.time()  # 记录程序结束时间
tim_ms = round((end_time - start_time)*1000,2)

print("程序运行时间为", tim_ms, "毫秒")


 
程序运行时间为 449.95 毫秒

看来打开关闭在数据量大的情况下,相当耗时

读写方法

 
def write(obj,file_path):
    """
    直接将对象转字符串写入文件,这样可以在文件打开时,看到原内容,还可以进行搜索
    """
    ss = str(obj)
    with open(file_path,"w",encoding="utf-8") as f:
        f.write(ss)

def read(file_path):
    with open(file_path,'r',encoding="utf-8") as f:
        c = eval(f.read())
        return c 

python open file mode

 
r   只读 按字符,文件必须存在
r+  读写 按字符,文件必须存在

rb  只读 按字节,文件必须存在
rb+ 读写 按字节,文件必须存在

w   只写 按字符,文件不存在则创建,存在则覆盖
w+  读写 按字符,文件不存在则创建,存在则覆盖

wb  只写 按字节,文件不存在则创建,存在则覆盖
wb+ 读写 按字节,文件不存在则创建,存在则覆盖

python read

f.read()

 
f = open("a.txt",mode="rb")

print(f.read())#一次读取所有内容

f.close()

 
b'a\nd\nd \na\n\xe5\x9c\xa8\n\xe5\x9c\xa8\xe8\xa6\x81\xe5\xb7\xa5\n'

f.read(size)

 
一次读取指定的字节数 

参考
    Python读写文件
    
    python中文件读写mode参数