アカリの部屋

TensorFlow数据预处理和共享变量

数据读取方式

TensorFlow支持单机也支持分布式,在它的API当中分四个概念:Graph、Session、Runtime、Device,类比于软件开发中Progress、Process、OS、PC的概念,一个Session可以在某个Runtime里在某个Device上run某个Graph的instance,一个Session只能运行一个Graph,一个Graph可以指定多个Session。

TensorFlow本身支持多种前端语言比如Python,都是转化为C/C++API转化为执行引擎的,在分布式的情况下,和Hadoop/Spark体系类似,也是主节点分析计算图,通过gRPC(Protocol Buffers)分发到子节点之行的。在实际训练当中,GPU本身很快,瓶颈在于IO,也就是数据如何读到模型里。

第一种是常量直读,数据直接嵌入graph,由它传入每个执行这个graph的session中执行,这种方式适合很少的数据:

1
2
3
4
5
6
7
import tensorflow as tf
x = tf.constant([1,2,3], name='x')
y = tf.constant([2,3,4], name='y')
z = tf.add(x,y, name='z')
with tf.Session() as sess:
print(sess.run(z))

第二种是placeholder+feed_dict方式,比较流行,这种方式类似于hadoop,graph在device之间来回跑,数据在运行时从local读。

1
2
3
4
5
6
7
8
9
10
import tensorflow as tf
x = tf.placeholder(tf.int16)
y = tf.placeholder(tf.int16)
z = tf.add(x,y, name='z')
with tf.Session() as sess:
xs = [1,2,3]
ys = [2,3,4]
print(sess.run(z, feed_dict={x: xs, y: ys}))

第三种是生产消费者模式pipeline机制,一个进程读文件,一个进程异步计算,就防止了GPU等待硬盘IO:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import tensorflow as tf
# 读取需要的文件
files = ['./data/A.csv', './data/B.csv']
filenames = tf.train.match_filenames_once(files)
# 第二个参数是要不要打乱文件顺序,第三个参数表示每个文件会过3遍
filename_queue = tf.train.string_input_producer(filenames, shuffle=False, num_epochs=3)
# 用reader读文件,这个reader每次读取文件的一行
reader = tf.TextLineReader()
# value就是文件内容
_, value = reader.read(filename_queue)
# 解析csv,把一行读成多个column的值,后面是header的定义
# decode之后,解析出来的内容会被加入一个内存Quere中,不用自己实现,下面只要loop它就可以了
example, label = tf.decode_csv(value, record_defaults=[['null'], ['null']])
init_op = tf.local_variables_initializer()
with tf.Session() as sess:
sess.run(init_op)
# 线程管理协调器
coord = tf.train.Coordinator()
# 启动QueueRunner
threads = tf.train.start_queue_runners(coord=coord)
# 这个循环不断从文件中拿这两个值出来,直到5次
# 因为num_epochs为3,每个文件有6条记录,所以最多循环到18,超过则报错
for _ in range(18):
print(sess.run([example, label]))
# 用完需要关掉
coord.request_stop()
coord.join(threads)

在机器学习中更习惯一次读一批数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import tensorflow as tf
files = ['./data/A.csv', './data/B.csv']
filenames = tf.train.match_filenames_once(files)
filename_queue = tf.train.string_input_producer(filenames, shuffle=False, num_epochs=3)
reader = tf.TextLineReader()
_, value = reader.read(filename_queue)
# 下面有两种方式以batch的方式读取数据
example, label = tf.decode_csv(value, record_defaults=[['null'], ['null']])
example_batch, label_batch = tf.train.batch([example, label], batch_size=4, capacity=10)
# record_list = [ tf.decode_csv(value, record_defaults=[['null'], ['null']]) for _ in range(2)]
# example_batch, label_batch = tf.train.batch_join(record_list, batch_size=4, capacity=10)
init_op = tf.local_variables_initializer()
with tf.Session() as sess:
sess.run(init_op)
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
# 更安全的写法
try:
while not coord.should_stop():
print(sess.run([example_batch, label_batch]))
except tf.errors.OutOfRangeError:
print('Epochs complete!')
finally:
coord.request_stop()
coord.request_stop()
coord.join(threads)

TFRecord的生成和读取

在实际当中通常使用TFRecord二进制压缩文件来保证其余处理代码的形式统一,它由pb定义,实际就是一个map结构,一般在训练之前把TFRecord加载到每台机器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
message Example {
Features features = 1;
};
message Features {
map<string, Feature> feature = 1;
};
message Feature {
oneof kind {
BytesList bytes_list = 1;
FloatList float_list = 2;
Int64List int64_list = 3;
}
};

比如,把csv转为TFRecord:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import tensorflow as tf
import numpy as np
import pandas as pd
# 读取利用MNIST数据集转csv的文件
train_frame = pd.read_csv("train.csv")
# 分离最后一列作为label
train_labels_frame = train_frame.pop(item="label")
# 区分维度数据和label数据
train_values = train_frame.values
train_labels = train_labels_frame.values
writer = tf.python_io.TFRecordWriter("csv_train.tfrecords")
# 对每一条
for i in range(train_values.shape[0]):
# 填充pb结构
example = tf.train.Example(
features = tf.train.Features(
feature = {
"image_raw": tf.train.Feature(bytes_list=tf.train.BytesList(value=[train_values[i].tostring()])),
"label": tf.train.Feature(int64_list=tf.train.Int64List(value=[train_labels[i]]))
}
)
)
# 这里的SerializeToString是为了把map压缩成而今吃,存储为batch
writer.write(record=example.SerializeToString())
writer.close()

对图片的预处理也类似:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
### convert pics to tfrecord
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import numpy as np
import tensorflow as tf
import pandas as pd
# 伪造一个标记
def get_label_from_filename(filename):
return 1
filenames = ['./tensorflow_system.png']
# filenames = tf.train.match_filenames_once(files)
writer = tf.python_io.TFRecordWriter('png_train.tfrecords')
for filename in filenames:
# 把图片读成矩阵转string
img_raw = mpimg.imread(filename).tostring()
# 设置有监督读label
label = get_label_from_filename(filename)
# 这里和上面一样
example = tf.train.Example(
features = tf.train.Features(
feature = {
"image_raw": tf.train.Feature(bytes_list=tf.train.BytesList(value=[image_raw])),
"label": tf.train.Feature(int64_list=tf.train.Int64List(value=[label]))
}
)
)
writer.write(record=example.SerializeToString())
writer.close()

读取TFRecord文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import tensorflow as tf
# 这个reader一次读一个Example出来
reader = tf.TFRecordReader()
filename_queue = tf.train.string_input_producer(["csv_train.tfrecords"])
_, serialized_record = reader.read(filename_queue)
features = tf.parse_single_example(
serialized_record,
features={
# tf.FixedLenFeature return Tensor
# tf.VarLenFeature return SparseTensor
"image_raw": tf.FixedLenFeature([], tf.string),
"label": tf.FixedLenFeature([], tf.int64),
})
# 前面用了SerializeToString,就要decode_raw回来
images = tf.decode_raw(features["image_raw"], tf.uint8)
labels = tf.cast(features["label"], tf.int32)
with tf.Session() as sess:
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
for i in range(5):
image, label = sess.run([images, labels])
print(image)
print(label)
coord.request_stop()
coord.join(threads)

关于计算图中的变量

计算图中的变量如果胡乱写的话,会让TensorBoard显得很混乱,这里记录一些方式,下面的代码首先要导包:

1
2
3
4
import numpy as np
import tensorflow as tf
from datetime import datetime

定义操作的第一种方式,一切都重复定义,难以维护

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
X1 = tf.placeholder(tf.float32, shape=(None, 3), name="X1")
w1 = tf.Variable(np.array([[1.],[10.],[100.]], dtype=np.float32), name="weights1")
b1 = tf.Variable(0.0, name="bias1")
z1 = tf.add(tf.matmul(X1, w1), b1, name="z1")
relu1 = tf.maximum(z1, 0., name="relu1")
X2 = tf.placeholder(tf.float32, shape=(None, 3), name="X2")
w2 = tf.Variable(np.array([[1.],[10.],[100.]], dtype=np.float32), name="weights2")
b2 = tf.Variable(0.0, name="bias2")
z2 = tf.add(tf.matmul(X2, w2), b2, name="z2")
relu2 = tf.maximum(z2, 0., name="relu2")
output = tf.add(relu1, relu2, name="output")
init = tf.global_variables_initializer()
with tf.Session() as sess:
sess.run(init)
now = datetime.utcnow().strftime("%Y%m%d%H%M%S")
logdir = "./lession/graphs/relu-runat-" + now
writer = tf.summary.FileWriter(logdir, sess.graph)
# result1 = sess.run(relu1, feed_dict={X1: [[1,10,100]]})
# result2 = sess.run(relu2, feed_dict={X2: [[1,10,100]]})
# print(result1)
# print(result2)
out = sess.run(output, feed_dict={X1: [[1,10,100]], X2: [[1,10,100]]})
print(out)
writer.close()

logo

定义操作的第二种方式,定义一个函数,tensorBoard对同节点做了折叠

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def relu(X):
w = tf.Variable(np.array([[1.],[10.],[100.]], dtype=np.float32), name="weights")
b = tf.Variable(0.0, name="bias")
z = tf.add(tf.matmul(X, w), b, name="z")
return tf.maximum(z, 0., name="relu")
X = tf.placeholder(tf.float32, shape=(None, 3), name="X")
relus = [relu(X) for i in range(5)]
output = tf.add_n(relus, name="output")
init = tf.global_variables_initializer()
with tf.Session() as sess:
sess.run(init)
now = datetime.utcnow().strftime("%Y%m%d%H%M%S")
logdir = "./lession/graphs/relu-runat-" + now
writer = tf.summary.FileWriter(logdir, sess.graph)
out = sess.run(output, feed_dict={X: [[1,10,100]]})
print(out)
writer.close()

logo

定义操作的第三种方式,将relu函数里的东西放在一个命名空间里,tensorBoard对所有relu操作做了折叠

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def relu(X):
with tf.name_scope("relu"):
w = tf.Variable(np.array([[1.],[10.],[100.]], dtype=np.float32), name="weights")
b = tf.Variable(0.0, name="bias")
z = tf.add(tf.matmul(X, w), b, name="z")
return tf.maximum(z, 0., name="relu")
X = tf.placeholder(tf.float32, shape=(None, 3), name="X")
relus = [relu(X) for i in range(5)]
output = tf.add_n(relus, name="output")
init = tf.global_variables_initializer()
with tf.Session() as sess:
sess.run(init)
now = datetime.utcnow().strftime("%Y%m%d%H%M%S")
logdir = "./lession/graphs/relu-runat-" + now
writer = tf.summary.FileWriter(logdir, sess.graph)
out = sess.run(output, feed_dict={X: [[1,10,100]]})
print(out)
writer.close()

logo

通过共享变量定义阈值的第一种方式,直接让函数里name_scope内部吃进来外部定义的一个变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
threshold = tf.Variable(0.0, name="threshold")
def relu(X):
with tf.name_scope("relu"):
w = tf.Variable(np.array([[1.],[10.],[100.]], dtype=np.float32), name="weights")
b = tf.Variable(0.0, name="bias")
z = tf.add(tf.matmul(X, w), b, name="z")
return tf.maximum(z, threshold, name="relu")
X = tf.placeholder(tf.float32, shape=(None, 3), name="X")
relus = [relu(X) for i in range(5)]
output = tf.add_n(relus, name="output")
init = tf.global_variables_initializer()
with tf.Session() as sess:
sess.run(init)
now = datetime.utcnow().strftime("%Y%m%d%H%M%S")
logdir = "./lession/graphs/relu-runat-" + now
writer = tf.summary.FileWriter(logdir, sess.graph)
out = sess.run(output, feed_dict={X: [[1,10,100]]})
print(out)
writer.close()

logo

通过共享变量定义阈值的第二种方式,把变量放在name_scope内部定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def relu(X):
with tf.name_scope("relu"):
if not hasattr(relu, "threshold"):
relu.threshold = tf.Variable(0.0, name="threshold")
w = tf.Variable(np.array([[1.],[10.],[100.]], dtype=np.float32), name="weights")
b = tf.Variable(0.0, name="bias")
z = tf.add(tf.matmul(X, w), b, name="z")
return tf.maximum(z, relu.threshold, name="relu")
X = tf.placeholder(tf.float32, shape=(None, 3), name="X")
relus = [relu(X) for i in range(5)]
output = tf.add_n(relus, name="output")
init = tf.global_variables_initializer()
with tf.Session() as sess:
sess.run(init)
now = datetime.utcnow().strftime("%Y%m%d%H%M%S")
logdir = "./lession/graphs/relu-runat-" + now
writer = tf.summary.FileWriter(logdir, sess.graph)
out = sess.run(output, feed_dict={X: [[1,10,100]]})
print(out)
writer.close()

logo

过共享变量定义阈值的第三种方式,tf推荐方式之一,放在函数外部定义

调用get_variable的时候,如果变量不存在则创建,否则获取,具体的行为取决于当前所在的variable_scope

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 在func的外侧,定义命名空间relu里的threshold的初始化方式,这里定义了形状和初始化
with tf.variable_scope("relu"):
# relu/threshold 在func外部被定义,因为variable_scope一样,在relu函数内部被重用
threshold = tf.get_variable("threshold", shape=(), initializer = tf.constant_initializer(0.0))
def relu(X):
# 如果希望重用一个变量,需要显式声明reuse=True,在这种情况下,无需指定形状和初始化
with tf.variable_scope("relu", reuse=True):
# 因为已经被定义过,所以通过get_variable方式重用
threshold = tf.get_variable("threshold")
w = tf.Variable(np.array([[1.],[10.],[100.]], dtype=np.float32), name="weights")
b = tf.Variable(0.0, name="bias")
z = tf.add(tf.matmul(X, w), b, name="z")
return tf.maximum(z, threshold, name="relu")
# 在func内部的另一种重用写法
# with tf.variable_scope("relu"):
# scope.reuse_variables()
# threshold = tf.get_variable("threshold")
# 指定形状和初始化的写法
# with tf.variable_scope("relu"):
# threshold = tf.get_variable("threshold", shape=(),initializer=tf.constant_initializer(0.0))
X = tf.placeholder(tf.float32, shape=(None, 3), name="X")
relus = [relu(X) for i in range(5)]
output = tf.add_n(relus, name="output")
init = tf.global_variables_initializer()
with tf.Session() as sess:
sess.run(init)
now = datetime.utcnow().strftime("%Y%m%d%H%M%S")
logdir = "./lession/graphs/relu-runat-" + now
writer = tf.summary.FileWriter(logdir, sess.graph)
out = sess.run(output, feed_dict={X: [[1,10,100]]})
print(out)
writer.close()

logo

通过共享变量定义阈值的第四种方式,放在函数内部定义,显示指定是否重用,解决了上面方式变量散落的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def relu(X):
w = tf.Variable(np.array([[1.],[10.],[100.]], dtype=np.float32), name="weights")
b = tf.Variable(0.0, name="bias")
z = tf.add(tf.matmul(X, w), b, name="z")
# 这个变量直接定义在relu操作的内部,在第一次relu被调用的时候创建
threshold = tf.get_variable("threshold", shape=(), initializer = tf.constant_initializer(0.0))
return tf.maximum(z, threshold, name="relu")
X = tf.placeholder(tf.float32, shape=(None, 3), name="X")
# 这种方法解决了变量定义位置不泄露的问题,但是给定义运算的时候添了麻烦
relus = []
for relu_index in range(5):
# 在第一次操作之后,复用这个变量
with tf.variable_scope("relu", reuse=(relu_index >= 1)) as scope:
relus.append(relu(X))
output = tf.add_n(relus, name="output")
init = tf.global_variables_initializer()
with tf.Session() as sess:
sess.run(init)
now = datetime.utcnow().strftime("%Y%m%d%H%M%S")
logdir = "./lession/graphs/relu-runat-" + now
writer = tf.summary.FileWriter(logdir, sess.graph)
out = sess.run(output, feed_dict={X: [[1,10,100]]})
print(out)
writer.close()

logo