文件绝对路径
import os file_name = 'a.conf' absolute_path = os.path.abspath(file_name) print(absolute_path) #/opt/tpf/aitpf/cmd/a.conf
import chardet file_path="E:\\tpf\\aitpf\\bigmodel\\agent\\prompts\\tools\\excel_analyser.txt" # 检测文件编码格式 def detect_encoding(file_path): with open(file_path, 'rb') as f: result = chardet.detect(f.read()) return result['encoding'] # file_path = 'your_file_path' encoding = detect_encoding(file_path) print("encoding:",encoding) # 使用检测到的编码格式打开文件 with open(file_path, 'r', encoding=encoding) as f: content = f.read() print(content) |
|
|
|
|
file = open('a.txt','w') #向文件中输入字符串 file.write('a\n') file.write('b\n') file.write('c\n') file.close() f=open('a.txt','r') #读取所有内容 f.read() #'a\nb\nc\n' f.close()
a+ 追加 w+ 可读可写,写覆盖 w 写覆盖 r 只读,文件不存在时报错 open(file_name,"r","utf-8")
文件读写方法
f.read() 读全部内容 f.readline() 逐行读 f.readlines() 所有行 f.write() 写 f.writelines() 写列表
#-- coding: UTF-8 -- from ai.params import csv_path1 # 方式一 for line in open(csv_path1): print(line) # 方式二 with open(csv_path1) as f: line = f.readline() while line: print(line, end = '\n') line = f.readline() # 方式三 with open(csv_path1) as f: lines = f.readlines(10) if lines : for line in lines: print(line)
utf8,utf-8,UTF8,UTF-8都可以
cat a.txt a with open(file="a.txt",mode='r',encoding='utf8') as f: line = f.readline() print(line) line = f.readline() print(f"--{line}--") print(line.strip()) #什么也没有,在python中,空行也可以.处理方法而不报错
单位:字节
fpath = "aa.log" import os stt = os.stat(fpath) print(stt.st_size) #5
from pathlib import Path f = Path(fpath) size = f.stat().st_size print(size) #5
import os size = os.path.getsize(fpath) print(size) #5
import os def to_mysql(value_list): print(value_list) base_dir="need_todbc" # 获取当前目录下的所有文件 files = [os.path.join(base_dir, file) for file in os.listdir(base_dir)] value_list_tmp = [] row_num = 0 max_count_todb = 7 for file in files: if file.endswith(".txt"): with open(file,mode='r',encoding='utf8') as f: line = f.readline() while line: value_list_tmp.append(line.strip()) line = f.readline() row_num = row_num+1 if row_num>=max_count_todb: row_num = 0 to_mysql(value_list_tmp) value_list_tmp=[] if len(value_list_tmp)>0: to_mysql(value_list_tmp) value_list_tmp=[]
批次处理,并可中断可反复读取
import os def deal_func(value_list): print(value_list) def batch_read(base_dir="need_todbc", save_file = "readed_file.txt", batch_size = 5): # 获取当前目录下的所有文件 files = [os.path.join(base_dir, file) for file in os.listdir(base_dir)] value_list_tmp = [] value_list_old =[] row_num = 0 if os.path.exists(save_file): readed_file = open(save_file,'r') value_list_old = readed_file.readlines() readed_file.close() line_num = 0 for line in value_list_old: value_list_old[line_num] = line.strip() line_num = line_num+1 write_file = open(save_file,'a+') for file in files: if file.endswith(".txt") and (file not in value_list_old): with open(file,mode='r',encoding='utf8') as f: line = f.readline() while line: value_list_tmp.append(line.strip()) line = f.readline() row_num = row_num+1 if row_num>=batch_size: row_num = 0 deal_func(value_list_tmp) value_list_tmp=[] if len(value_list_tmp)>0: deal_func(value_list_tmp) value_list_tmp=[] value_list_old.append(file)#文件写完后保存一下 write_file.write(f"{file}\n") write_file.close() batch_read(base_dir="need_todbc", save_file = "readed_file.txt")
import os def last_line(filename, last_n = 2): """读取一个文件的最后n行,前去除前后空格 fp.seek(offset, 2): - where=0,1,2分别表示从文件头,当前指针位置,文件尾偏移,缺省值为0 - where=2,文件打开的方式必须是二进制打开,即使用'rb'模式 """ try: filesize = os.path.getsize(filename) if filesize == 0: return None else: with open(filename, 'rb') as fp: # to use seek from end, must use mode 'rb' offset = -16 while -offset < filesize: # fp.seek(offset, 2) # lines = fp.readlines() # if len(lines) >= last_n: res = lines[-last_n:] return ([(v.decode("utf8")).strip() for v in res]) # else: offset *= 2 fp.seek(0) # 边界条件的处理,只有一行时 lines = fp.readlines() return ((lines[-1]).decode('utf8')).strip() except FileNotFoundError: print(filename + ' not found!') return None
strip可去除首尾的字符
$ cat /opt/tpf/aiwks/datasets/text/a.cvs {aa=1,bb=2} from ai.params import csv_path1 with open(csv_path1) as f: lines = f.readlines(64) if lines : for line in lines: kv = line.replace(" ","").strip('{').strip('}').split(",") row = {} for elem in kv: tmp = elem.split("=") row[tmp[0]] = tmp[1] print(row)
pandas读取csv zip压缩文件
import zipfile import pandas as pd zp = zipfile.ZipFile("data/aa.zip", 'r') names = zp.namelist() f = zp.open(names[0]) data = pd.read_csv(f) f.close()
python读取tar.gz文件
import tarfile import pandas as pd tar = tarfile.open('data/aa.tar.gz') name = tar.getnames() #获取被压缩文件的名字,list形式 tar.extractall('./tmp') #解压后文件存放的路径 df = pd.read_csv('./tmp/' + name[0])
python读取zip压缩文件
import zipfile import pandas as pd class ZipReader(object): '''python读取zip压缩文件 ''' def __init__(self, zip_path): ''' :param zip_path: zip文件路径 ''' self.zip = zipfile.ZipFile(zip_path, 'r') # 创建一个zipfile def get_filecount(self): ''' :return: 返回压缩包里面的文件个数 ''' return len(self.zip.namelist()) def get_files(self): '''generator ,每次返回一个文件的内容 ''' for name in self.zip.namelist(): yield self.read_lines(name) # 生成器 def read_lines(self, name): '''list列表,每个元素为一行 ''' return [line.decode() for line in self.zip.open(name).readlines()] def get_filenames(self): '''zip文件里面的所有文件名 ''' return self.zip.namelist() def extract_to(self, path): '''解压路径 ''' self.zip.extractall(path) return path def read_cvs(self, name): '''读取单个csv文件并转为pandas ''' f = self.zip.open(name) data = pd.read_csv(f) f.close() return data def read_csvs(self): '''generator,每次返回一个csv文件的内容 ''' for name in self.zip.namelist(): yield self.read_cvs(name) # 生成器 if __name__ == "__main__": zp = ZipReader(zip_path="data/aa.zip") print("文件个数:", zp.get_filecount()) print("文件名列表:", zp.zip.namelist()) print("文件内容(所有文件):", str(list(zp.get_files()))) print("解压路径:", zp.extract_to("./test")) for data in zp.read_csvs(): print(type(data)) # class 'pandas.core.frame.DataFrame' break
统计指定目录下特定格式文件的个数
import datetime import os def file_counts(filePath="/tmp/logs", nearent_days = 7, expect_count=7): """统计最近几天生成文件个数,文件名称以日期开头 """ file_num = 0 for i in range(1, nearent_days+1): day = datetime.datetime.now() - datetime.timedelta(days=i) day = day.strftime('%Y-%m-%d') day_num = 0 # os.listdir 读取出当前文件夹下的文件夹和文件 for file in os.listdir(filePath): if file.endswith(".zip") and file.startswith(day): file_num += 1 day_num += 1 if day_num != expect_count : print(day,day_num) print(f'最近{nearent_days}天文件总数为:{file_num}个--------') file_counts()
in or not in
"go" in "good" # True "bad" not in "good" # True
使用切片
a="a,a,a," a[:-1] # 'a,a,a'
writelines
import time start_time = time.time() # 记录程序开始时间 for i in range(1000): with open('example.txt', 'w') as file: lines = [] for i in range(1000): lines.append('{}'.format(i)) file.writelines(lines) end_time = time.time() # 记录程序结束时间 tim_ms = round((end_time - start_time)*1000,2) print("程序运行时间为", tim_ms, "毫秒")
程序运行时间为 2629.4 毫秒
import time start_time = time.time() # 记录程序开始时间 for i in range(10000): with open('example2.txt', 'w') as file: file.write('{}'.format(i)) end_time = time.time() # 记录程序结束时间 tim_ms = round((end_time - start_time)*1000,2) print("程序运行时间为", tim_ms, "毫秒")
程序运行时间为 21376.61 毫秒 这是1W次循环的时间,下面还测试一下无打开关闭的时间
import time file = open('example3.txt', 'a') start_time = time.time() # 记录程序开始时间 for i in range(1000000): file.write('{}'.format(i)) file.close() end_time = time.time() # 记录程序结束时间 tim_ms = round((end_time - start_time)*1000,2) print("程序运行时间为", tim_ms, "毫秒")
程序运行时间为 449.95 毫秒 看来打开关闭在数据量大的情况下,相当耗时
def write(obj,file_path): """ 直接将对象转字符串写入文件,这样可以在文件打开时,看到原内容,还可以进行搜索 """ ss = str(obj) with open(file_path,"w",encoding="utf-8") as f: f.write(ss) def read(file_path): with open(file_path,'r',encoding="utf-8") as f: c = eval(f.read()) return c
r 只读 按字符,文件必须存在 r+ 读写 按字符,文件必须存在 rb 只读 按字节,文件必须存在 rb+ 读写 按字节,文件必须存在 w 只写 按字符,文件不存在则创建,存在则覆盖 w+ 读写 按字符,文件不存在则创建,存在则覆盖 wb 只写 按字节,文件不存在则创建,存在则覆盖 wb+ 读写 按字节,文件不存在则创建,存在则覆盖
f.read()
f = open("a.txt",mode="rb") print(f.read())#一次读取所有内容 f.close()
b'a\nd\nd \na\n\xe5\x9c\xa8\n\xe5\x9c\xa8\xe8\xa6\x81\xe5\xb7\xa5\n'
f.read(size)
一次读取指定的字节数
Python读写文件 python中文件读写mode参数