-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy path--python基础
115 lines (102 loc) · 3.63 KB
/
--python基础
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
isinstance判断类型
list(range(100)[::-10])=[99, 89, 79, 69, 59, 49, 39, 29, 19, 9]
s[::-1]和''.join(reversed(s))可以将字符串反转
requests::::get:pramas/auth;post:data;json:json
requests.get(url,timeout=3)
要读取非UTF-8编码的文本文件,需要给open()函数传入encoding参数,例如,读取GBK编码的文件:
>>> f = open('/Users/michael/gbk.txt', 'r', encoding='gbk',errors='ignore')
>>> f.read()
'测试'
open()函数还接收一个errors参数,表示如果遇到编码错误后如何处理
os.mkdir/rmdir()创建/删除文件夹
os.getcwd()取当前位置
os.path.join/split()拼接/分解地址
os.path.splitext()可以直接让你得到文件扩展名,很多时候非常方便:
os.rename('test.txt', 'test.py')重命名文件/文件夹,更改目录的话会移动文件/文件夹
os.remove('test.py')删除文件/文件夹
shutil.copyfile('f1.log', 'f2.log')复制文件f1到f2
def f(cd): 遍历cd下的文件和文件夹
for i in os.listdir(cd):
if os.path.isfile(os.path.join(cd,i)):
print('文件',os.path.join(cd,i))
else:
print(os.path.join(cd,i))
f(os.path.join(cd,i))
>>> import json
>>> d = dict(name='Bob', age=20, score=88)
>>> json.dumps(d)
'{"age": 20, "score": 88, "name": "Bob"}'
>>> json_str = '{"age": 20, "score": 88, "name": "Bob"}'
>>> json.loads(json_str)
{'age': 20, 'score': 88, 'name': 'Bob'}
from multiprocessing import Process 不知道为什么要在cmd运行,否则不显示
import os
# 子进程要执行的代码
def run_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid()))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')
--------------------------------------------------------------------------------------------------
from multiprocessing import Pool
import os, time, random
def long_time_task(name):
print('Run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Pool(4)
for i in range(5):
p.apply_async(long_time_task, args=(i,))
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')
==============================================================
nultiprocessing/subprocess(popen,queue,call等看不懂),分布式看不懂
=======================================================================
import time, threading
bb=0
lock=threading.Lock() 建立线程锁对象
def j(n):
global bb
lock.acquire() 线程锁
try:
for i in range(100000):
bb+=n
bb-=n
finally:lock.release() 解锁
if __name__=='__main__':
pp=[]
for i in range(1,5):
p=threading.Thread(target=j,args=(i,))
pp.append(p)
for i in pp:
i.setDaemon(True)
i.start()
for i in pp:
i.join()
print(bb)
====================================================
import time, threading
lo=threading.local() #交换各线程的局部变量
def f(n):
n=n**2
lo.nn=n
print(threading.current_thread().name,lo.nn)
tt=[]
for _ in range(9):
t=threading.Thread(target=f,args=(_,))
t.setDaemon(True)
t.start()
tt.append(t)
for _ in tt:
_.join()
--------------------------------------------------------------