tensorflow 六种办法构建读入batch样本(含序列特征处理),踩坑经历值得保藏


书接上文,对 图机器学习算法 感兴趣 的 同学 能够去 图算法十篇 之 图机器学习系列文章总结 这儿检查,对 引荐广告算法 感兴趣 的同学 能够去这儿 系列小作文之企业级机器学习pipline总结 检查,干货多多 哦! 而对 运用 tensorflow 完成 杂乱机器学习/深度学习 模型 感兴趣 的同学, 欢迎重视 算法全栈之路 的 大众号 接下来 逐渐更新 的 模型手把手系列 的文章~

本篇是 模型手把手系列 的第二篇文章,本系列的上一篇文章 模型手把手系列开篇 之 python、spark 和 java 生成TFrecord 中 咱们 首要阐明 了 怎么用 多种办法 生成 tensorflow 官方引荐 的 数据格局 tfrecord 的 办法,而 本章 咱们则 将继续 看看 tensorflow 怎么 读取 各种类型的特征,特别是序列特征数据 ,并运用 多种办法生成 batch 练习样本,代码 包括 tensorflow 1.x 系列tensorflow 2.x 系列 的 办法,走过 路过 不能错过 哦 !

作者 花了 大量时刻 来 整理 本文章里 介绍的 多种办法 的源码,是由于 在 当初写 图算法相关的文章之 图上 deepwalk 算法理论与实战,图算法之瑞士军刀篇(一) 以及 图上 deepwalk 算法理论 与 tensorflow keras 实战,图算法之瑞士军刀篇(二) 这 两篇 文章的时分, 小小的 batch 数据生成,坑死了我这个混迹于国内外互联网大厂多年的算法老同志~ ,有些问题 没遇到不算事,遇到了 找bub 真是要了我的小命了。 闲言少叙 ,就看文章干不干,转需吧 ~

本文首要讲解了 6 种 用tensorflow 1.x / 2.x 怎么读取 练习数据,特别是 序列特征数据 的 处理办法。由于 这些办法 有着 各自的运用场景 和 各自运用特色,算是 对 前次遇坑 的报复性处理心理 吧,这儿 我 悉数 把开发列举 出来了,期望 能够 实在的协助到 同样 遇到问题的老哥 。

老话说得好: 代码是表达程序员思想的最好语言。本文的 数据读入代码 ,刻意 分析了 运用 tensorflow 多种办法读入 用户前史行为序列特征 的过程 ,代码 每个 单元cell 均能够 独立完美 运转成功,具有极高的 参阅价值 哦。详细内容 直接 看代码 吧!~~


(1)代码韶光

本文共介绍了 6种 tensorflow 读取数据 并 batch 练习 的办法,包括运用 slice_input_producer、from_tensor_slices、generate、interleave 以及 自定义 生成batch 数据 等办法,下面就让咱们 一种一种办法 的 介绍吧,总有一种合适你的。

(1.0) 数据预备

本文 用到的 数据,从 内存中 读取csv的,咱们在这儿直接列出; 而运用到 tfrecord 的,咱们则运用的上文 模型手把手系列开篇 之 python、spark 和 java 生成TFrecord 中python办法单机版生成的tfrecord 数据。

@欢迎重视作者大众号算法全栈之路
importpandasaspd
raw_df=pd.DataFrame([[28,12.1,'male',[1,2],1],[30,8.7,'female',[3,4,5],0],[32,24.6,'female',[6,7,8,9,10],1]],columns=['age','price','sex','click_list','label'])
#序列特征长度不够填充,运用tf.train.batch生成batch有必要要定长序列
max_len=5
padding_value=0
raw_df['click_list']=raw_df['click_list'].apply(lambdax:x+[padding_value]*(max_len-len(x)))
raw_df['click_list_str']=raw_df['click_list'].apply(lambdax:'#'.join(map(str,x)))
#普通特征处理
raw_df['age']=raw_df['age'].astype(str)
raw_df['sex']=raw_df['sex'].astype(str)
raw_df['label']=raw_df['label'].astype(str)
print(raw_df)
raw_df.to_csv("read_sample.csv",sep='\t',index=False)

代码 很 简单,我就 不赘述 了。

中间要 留意的是:click_list 这一列特征便是 序列特征 ,每个用户 的 前史行为序列 的长度 并非定长 ,可是在某些办法里, 生成batch特征的时分,要求list 类型的数据是定长的 ,所以 我这儿 用 默许值 0 进行了 padding 填充 。


(1.1)tensorflow 1.x 运用 slice_input_producer 生成 batch 数据

看代码吧。

@欢迎重视作者大众号算法全栈之路
importtensorflow.compat.v1astf
tf.compat.v1.disable_eager_execution()
#创立输入数据行列
input_queue=tf.train.slice_input_producer(
[raw_df['age'].to_list(),raw_df['price'].to_list(),raw_df['sex'].to_list(),raw_df['click_list'].to_list(),raw_df['label'].to_list()],
shuffle=True
)
#读取行列中的数据
all_sample_count=len(raw_df)
batch_size=2
num_threads=1
min_after_dequeue=1
all_feature_batch=tf.train.batch(
input_queue,
batch_size=batch_size,
num_threads=num_threads,
capacity=min_after_dequeue+(num_threads+1)*batch_size
)
#打印输出成果
withtf.Session()assess:
#初始化变量
sess.run(tf.global_variables_initializer())
#发动行列操作
coord=tf.train.Coordinator()
threads=tf.train.start_queue_runners(coord=coord)
foriinrange(all_sample_count//batch_size):
age_batch,price_batch,sex_batch,click_list_batch,label_batch=sess.run(all_feature_batch)
print(f"age_batch:{age_batch}\nprice_batch:{price_batch}\nsex_batch:{sex_batch}nclick_list_batch:{click_list_batch}nlabel_batch:{label_batch}")
coord.request_stop()
coord.join(threads)

这儿这儿的 tf 是 import tensorflow.compat.v1 as tf ,适配于 tensorflow 1.x 系列 的模型。

这儿首要用了 tf.train.slice_input_producer 和 tf.train.batch 数据来 生成batch 数据。

仍是 要点说下序列特征 列 click_list_batch ,这儿 读入的 是一个 前史点击行为序列ID list,是 定长的 int 型 。定长 那就好办了呀 ,直接 接embeding matrix 拿到每个 id 对应 的 embeding 然后扔进模型里去。

这个 cell 里 的 代码是能够 跑通的,假如的确协助到你了,欢迎 重视作者的大众号 凑个份子~


(1.2)tensorflow 2.0 直接运用 from_tensor_slices 生成 batch 数据

@欢迎重视作者大众号算法全栈之路
importtensorflowastf
tf.config.run_functions_eagerly(True)
print("eager_status:",tf.executing_eagerly())
importpandasaspd
batch_size=3
max_len=5
raw_df['click_list']=raw_df['click_list'].apply(lambdax:'#'.join(map(str,x)))
raw_df['age']=raw_df['age'].astype(str)
raw_df['price']=raw_df['price'].astype(str)
dataset=tf.data.Dataset.from_tensor_slices((raw_df[['age','price','sex','click_list']].values,raw_df['label'].values))
dataset=dataset.shuffle(buffer_size=len(raw_df)).batch(batch_size)
#Iterateoverthebatches
forbatchindataset:
features,labels=batch
#定位到序列特征所在位置
str_list_batch=features[:,3:4]
list_feature=tf.strings.split(str_list_batch,"#")
#输出是一个SparseTensorValue目标
#https://blog.csdn.net/ustbbsy/article/details/116644136
print("ccccc:",list_feature.values)
print(list_feature.shape)

print('Features:',features)
#另一种定位序列特征的办法
print('Features(1):',features[1][3])
print('Labels:',labels)
print()

留意: 由于 我本机mac 的 tensorflow 版本是 2.6.0 的版本,所以这儿tf默许便是2.6.0了。

咱们 能够运用

tf.config.run_functions_eagerly(True)
print("eager_status:",tf.executing_eagerly())

来承认 是否发动了 tensorflow 2.x系列的 eager 形式

这儿仍是 要点说一些 序列特征吧,这儿读入的是 把序列特征拼接成一个字符串 ,然后在对 每个batch里 进行 字符串的切割,咱们 这儿用到的 办法 是 :

str_list_batch=features[:,3:4]
list_feature=tf.strings.split(str_list_batch,"#")

留意 tf.strings.split 的 回来是一个 SparseTensorValue 目标, .values 特点能够拿到详细的值。

由于是 把序列特征拼接成了字符串,所以 咱们这儿 不要求序列长度是定长 的,非定长的序列特征处理 得到 SparseTensorValue 之后,咱们能够运用 tf.Variable 或 tf.keras.layers.Embedding 来创立该嵌入矩阵。 最终,咱们能够运用 tf.nn.embedding_lookup_sparse()函数 来获取 嵌入向量。

最终在着重一点 便是:关于支撑 eager形式的 dataset, 咱们能够直接用for循环以及dict 来获取对应特征的取值 哦,十分方便,十分强大 ,运用前 留意承认 eager形式 是否敞开


(1.3)运用 dataset 的 generate 生成 batch 数据

关于 数据量不太大 的练习数据,许多同学 仍是 习气 运用 python 的 yeild 来 构建generator , 所以 咱们 也供给 了 根据 generator 来 生成 batch 样本 的办法,看代码吧 ~

@欢迎重视作者大众号算法全栈之路
importtensorflowastf
importpandasaspd
importnumpyasnp
#创立一个虚拟的pandasdataframe
df=pd.DataFrame({
'float_col':np.random.rand(3),
'int_col':np.random.randint(0,10,size=(3)),
'str_col':['string{}'.format(i)foriinrange(3)],
'list_col':[[i,i+1]foriinrange(3)]
})
print(df)
#创立一个生成器函数,用于将pandasdataframe转化为Tensorflow数据集
defgenerator():
forindex,rowindf.iterrows():
yield(
{
'float_input':row['float_col'],
'int_input':row['int_col'],
'str_input':row['str_col'],
'list_input':row['list_col']
},
row['int_col']#将int_col作为标签
)
#创立Tensorflow数据集
dataset=tf.data.Dataset.from_generator(generator,
output_signature=(
{
'float_input':tf.TensorSpec(shape=(),dtype=tf.float32),
'int_input':tf.TensorSpec(shape=(),dtype=tf.int32),
'str_input':tf.TensorSpec(shape=(),dtype=tf.string),
'list_input':tf.TensorSpec(shape=(2,),dtype=tf.int32)
},
tf.TensorSpec(shape=(),dtype=tf.int32)
))
#对数据进行批次处理
batch_size=8
dataset=dataset.batch(batch_size)
#打印数据集中的第一个批次
forfeature_batch,label_batchindataset:
print('float_input:',feature_batch['float_input'])
print('int_input:',feature_batch['int_input'])
print('str_input:',feature_batch['str_input'])
print('list_input:',feature_batch['list_input'])
print('label:',label_batch)

这儿的 要点 依然是 序列特征的处理 ,关于 定长 以及 非定长 的 序列特征,本文前面均 进行了 阐明,这儿 我就不在 着重 了,往上翻去 找找 就能够 看到哦。


(1.4)运用dataset 的 interleave 接口去读取 txt 样本文本文件

接下来 要介绍 的 两种办法,才是 咱们在 工业上 大数据场景下 实际运用的 十分多的 特征数据 读入办法,看代码吧~

@欢迎重视作者大众号算法全栈之路
importtensorflowastf
print("eager_status:",tf.executing_eagerly())
tf.config.run_functions_eagerly(True)
#练习集所有的列
TRAIN_SET_ALL_COLUMNS=["age","price","sex","click_list","label","click_list_str"]
#没有用到的列,这儿把去掉
TRAIN_SET_USELESS_COLUMN_NAMES=['click_list']
#并行度
NUM_PARALLEL_FOR_DATASET=1
BATCH_SIZE=2
defparse_txt_line(line,label_dtype):
iflabel_dtype==tf.dtypes.float32:
label_default_value=0.0
else:
label_default_value=0
#int64类型的默许值,用long(0)也不好使,要设置一个真正大于int32的数值
#默许值个数有必要和读入个数共同,很重要ValueError:notenoughvaluestounpack(expected12,got4)
#整数默许是[1<<32]
#默许值很重要,格局不对会导致这个问题
#ValueError:ColumndtypeandSparseTensorsdtypemustbecompatible.key:adid,columndtype:
#<dtype:'string'>,tensordtype:<dtype:'int64'>
field_defaults=[[""],[""],[""],[""],[label_default_value],[""]]
#从csv格局中解分出这些字段
age,price,sex,click_list,label,click_list_str=tf.io.decode_csv(line,field_defaults,field_delim="\t")
#对一些字段运用tf.cast进行类型转化,这儿完全不需要,下游有进行hash
#adid=tf.cast(adid,tf.dtypes.int32)
#|号分隔,tf.strings.to_number把字符串转化为默许浮点数
#user_click_seq=tf.strings.to_number(tf.strings.split(user_click_seq,sep="|"))
label=tf.cast(label,tf.int64)
fields_values=[age,price,sex,click_list,label,click_list_str]
features=dict(zip(TRAIN_SET_ALL_COLUMNS,fields_values))
#没有用到de列,需要pop出去
foruseless_column_nameinTRAIN_SET_USELESS_COLUMN_NAMES:
features.pop(useless_column_name)
label=features.pop("label")
#回来一个dict{feature_name,value}和label
returnfeatures,label
defget_text_dataset(data_set_path_list,label_dtype):
filenames_dataset=tf.data.Dataset.from_tensor_slices(data_set_path_list)
raw_dataset=filenames_dataset.interleave(
#2个线程并行去读TextLineDataset
lambdax:tf.data.TextLineDataset(x,num_parallel_reads=NUM_PARALLEL_FOR_DATASET),
#NUM_PARALLEL_FOR_DATASET=2
cycle_length=NUM_PARALLEL_FOR_DATASET,
block_length=BATCH_SIZE,
num_parallel_calls=NUM_PARALLEL_FOR_DATASET
)
raw_dataset=raw_dataset.\
map(lambdaline:parse_txt_line(line,label_dtype),num_parallel_calls=NUM_PARALLEL_FOR_DATASET).\
apply(tf.data.experimental.ignore_errors())
#格局dict(fea_name,value),label
returnraw_dataset
train_set_path_list=["read_sample.csv"]
train_raw_dataset=get_text_dataset(train_set_path_list,label_dtype=tf.dtypes.int64)
forfeature_batch,label_batchintrain_raw_dataset:
print(feature_batch['age'])
print(label_batch)

这儿的 代码 是 工业大数据场景下 常用的办法,咱们运用 tf.data.Dataset.from_tensor_slices 接口,一般会 先运用 tf.io.gfile 相关的接口 读取到 hdfs 大数据集群上的 文件路径 ,然后 tf.data.TextLineDataset并行读取,这儿 的 办法 首要调用了 parse_txt_line 这个办法 来解析单行 的 样本文件。

这儿的 序列特征,咱们能够在 parse_txt_line 用 python办法 把处理成 list 数据,可是 要求定长,详细办法 看本文 开始的时分 的处理办法。当然,也能够在 获得 batch 得时分 用 tf.strings.split 进行处理,和上面开篇 第二种 办法相同。

更近一步,乃至 咱们能够 将 序列特征字符串 一直 放到 模型里 去处理 都是能够的。


(1.5) 运用dataset 的 interleave 接口去读取 tfrecord 文件

这个办法是 企业级机器学习pipline 处理大数据量下 模型练习 用到最多 的办法,乃至 tfrecord 能够 兼容语音图画 等格局,这一块 感兴趣 的 同学自己下去 检查材料 吧,咱们 这儿 首要介绍的 都是 数值以及字符串列表搜广推算法 更多用到 的 特征数据。

@欢迎重视作者大众号算法全栈之路
importtensorflowastf
print("eager_status:",tf.executing_eagerly())
tf.config.run_functions_eagerly(True)
#并行度
NUM_PARALLEL_FOR_DATASET=1
BATCH_SIZE=2
defget_tf_record_dataset(data_set_path_list,shuffle=True):
files=tf.data.Dataset.list_files(data_set_path_list,shuffle=shuffle)
dataset=files.apply(
tf.data.experimental.parallel_interleave(
lambdax:tf.data.TFRecordDataset(x,num_parallel_reads=NUM_PARALLEL_FOR_DATASET),
cycle_length=NUM_PARALLEL_FOR_DATASET,
block_length=BATCH_SIZE,
sloppy=False
)
)

#parsing_spec是一个字典,它供给了每个特征到"FixedLenFeature"或"VarLenFeature"的映射
parsing_spec={
'age':tf.io.FixedLenFeature([1],tf.int64),
'price':tf.io.FixedLenFeature([1],tf.float32),
'gender':tf.io.FixedLenFeature([1],tf.string),
'click_list':tf.io.VarLenFeature(tf.int64),
'label':tf.io.FixedLenFeature([1],tf.int64)
}

defread_batch(serialized):

feature=tf.io.parse_example(serialized,features=parsing_spec)
label=feature['label']
returnfeature,{"label":label}
raw_tfrecord_data=dataset.map(read_batch,NUM_PARALLEL_FOR_DATASET)
#格局dict(fea_name,value),label
returnraw_tfrecord_data
train_set_path_list=["py_tf_record"]
train_raw_dataset=get_tf_record_dataset(train_set_path_list)
forfeature_batch,label_batchintrain_raw_dataset:
print("age:",feature_batch['age'])
#这儿的click_list回来的是一个SparseTensor,用.values办法能够得到值。
print("click_list:",feature_batch['click_list'].values)
print('label:',label_batch)

特别引荐 这儿介绍 的 处理数据 的办法,将练习数据 保存为 tfrecord 格局,不仅 速度快 而且 节约存储 空间,对 生成 tfrecord 数据 不熟悉的 同学,能够 去看 作者的 上一篇 文章 模型手把手系列开篇 之 python、spark 和 java 生成TFrecord 。

这儿 要点要着重 的是 parsing_specread_batch 办法,parsing_spec 中 定义来 定长和变长 tfrecord 数据 的 解析办法,十分优异,读出来得 序列特征 是 变长的 SparseTensor, 后边 处理得到 embeding 的办法,可与参阅 上面文章 介绍的 SparseTensor 得到 embeding 得 部分内容 哦,这儿我 就也 不再赘述 了。

本文 到这儿 ,咱们 共介绍了 5 种 tensorflow 读取数据 的办法,后两种 为 工业大数据模型 练习场景下 的 算法利器,强烈引荐。

加上 图上 deepwalk 算法理论与实战,图算法之瑞士军刀篇(一) 文章里运用的 自定义生成 batch 数据 的办法,共有 6种办法 来 适配不同的 业务数据 读取场景了,能够 算是 集 tensorflow 读取数据 的 大成之作了,每一个 末节 的 代码 均能够 独立运转成功,十分 值得保藏!

到这儿, 模型手把手系列开篇 之 tensorflow 六种办法读入batch样本(含序列特征处理), 踩坑经历值得保藏 的 全文 就 写完 了。本文代码 每个模块 均能够 独立跑 成功,总有一款 合适你,期望 能够对你 有 参阅效果 ~


码字不易,觉得有收成就动动小手转载一下吧,你的支撑是我写下去的最大动力 ~

更多更全更新内容,欢迎重视作者的大众号: 算法全栈之路

tensorflow 六种方法构建读入batch样本(含序列特征处理),踩坑经验值得收藏

  • END –