From bf6f9942f3d8d610e1a4bc4509d76e7b8b608e14 Mon Sep 17 00:00:00 2001 From: WilliamZhu Date: Wed, 11 Oct 2017 16:11:24 +0800 Subject: [PATCH 01/20] 1. Support NLP non-distribued training 2. Introduce Kafka to avoid broadcast huge tranning data --- .../estimators/tf_text_file_estimator.py | 312 ++++++++++++++++++ python/sparkdl/param/shared_params.py | 99 +++++- python/sparkdl/tf_fun.py | 90 +++++ python/sparkdl/transformers/named_text.py | 134 ++++++++ python/sparkdl/transformers/tf_image.py | 2 +- python/sparkdl/transformers/tf_text.py | 91 +++++ python/sparkdl/transformers/utils.py | 2 + python/tests/Test.py | 30 ++ python/tests/Test2.py | 22 ++ python/tests/resources/text/sample.txt | 4 + python/tests/transformers/tf_text_test.py | 126 +++++++ 11 files changed, 910 insertions(+), 2 deletions(-) create mode 100644 python/sparkdl/estimators/tf_text_file_estimator.py create mode 100644 python/sparkdl/tf_fun.py create mode 100644 python/sparkdl/transformers/named_text.py create mode 100644 python/sparkdl/transformers/tf_text.py create mode 100644 python/tests/Test.py create mode 100644 python/tests/Test2.py create mode 100644 python/tests/resources/text/sample.txt create mode 100644 python/tests/transformers/tf_text_test.py diff --git a/python/sparkdl/estimators/tf_text_file_estimator.py b/python/sparkdl/estimators/tf_text_file_estimator.py new file mode 100644 index 00000000..278ab8e5 --- /dev/null +++ b/python/sparkdl/estimators/tf_text_file_estimator.py @@ -0,0 +1,312 @@ +# +# Copyright 2017 Databricks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pylint: disable=protected-access +from __future__ import absolute_import, division, print_function + +import logging +import threading +import time +import os +import shutil + +import cPickle as pickle + +from kafka import KafkaConsumer +from kafka import KafkaProducer +from pyspark.ml import Estimator + +from sparkdl.param import ( + keyword_only, HasLabelCol, HasInputCol, HasOutputCol) +from sparkdl.param.shared_params import KafkaParam, FitParam, MapFnParam +import sparkdl.utils.jvmapi as JVMAPI + +__all__ = ['TFTextFileEstimator'] + +logger = logging.getLogger('sparkdl') + + +class TFTextFileEstimator(Estimator, HasInputCol, HasOutputCol, HasLabelCol, KafkaParam, FitParam, MapFnParam): + """ + Build a Estimator from tensorflow or keras when backend is tensorflow. + + First,assume we have data in dataframe like following. + + .. code-block:: python + documentDF = self.session.createDataFrame([ + ("Hi I heard about Spark", 1), + ("I wish Java could use case classes", 0), + ("Logistic regression models are neat", 2) + ], ["text", "preds"]) + + transformer = TFTextTransformer( + inputCol=input_col, + outputCol=output_col) + + df = transformer.transform(documentDF) + + TFTextTransformer will transform text column to `output_col`, which is 2-D array. + + Then we create a tensorflow function. + + .. code-block:: python + def map_fun(_read_data, **args): + import tensorflow as tf + EMBEDDING_SIZE = args["embedding_size"] + feature = args['feature'] + label = args['label'] + params = args['params']['fitParam'] + SEQUENCE_LENGTH = 64 + + def feed_dict(batch): + # Convert from dict of named arrays to two numpy arrays of the proper type + features = [] + for i in batch: + features.append(i['sentence_matrix']) + + # print("{} {}".format(feature, features)) + return features + + encoder_variables_dict = { + "encoder_w1": tf.Variable( + tf.random_normal([SEQUENCE_LENGTH * EMBEDDING_SIZE, 256]), name="encoder_w1"), + "encoder_b1": tf.Variable(tf.random_normal([256]), name="encoder_b1"), + "encoder_w2": tf.Variable(tf.random_normal([256, 128]), name="encoder_w2"), + "encoder_b2": tf.Variable(tf.random_normal([128]), name="encoder_b2") + } + + _read_data is a data generator. args provide hyper parameteres configured in this estimator. + + here is how to use _read_data: + + .. code-block:: python + for data in _read_data(max_records=params.batch_size): + batch_data = feed_dict(data) + sess.run(train_step, feed_dict={input_x: batch_data}) + + finally we can create TFTextFileEstimator to train our model: + + .. code-block:: python + estimator = TFTextFileEstimator(inputCol="sentence_matrix", + outputCol="sentence_matrix", labelCol="preds", + kafkaParam={"bootstrap_servers": ["127.0.0.1"], "topic": "test", + "group_id": "sdl_1"}, + fitParam=[{"epochs": 5, "batch_size": 64}, {"epochs": 5, "batch_size": 1}], + mapFnParam=map_fun) + estimator.fit(df) + + """ + + @keyword_only + def __init__(self, inputCol=None, outputCol=None, labelCol=None, kafkaParam=None, fitParam=None, mapFnParam=None): + super(TFTextFileEstimator, self).__init__() + kwargs = self._input_kwargs + self.setParams(**kwargs) + + @keyword_only + def setParams(self, inputCol=None, outputCol=None, labelCol=None, kafkaParam=None, fitParam=None, mapFnParam=None): + kwargs = self._input_kwargs + return self._set(**kwargs) + + def fit(self, dataset, params=None): + self._validateParams() + if params is None: + paramMaps = self.getFitParam() + elif isinstance(params, (list, tuple)): + if len(params) == 0: + paramMaps = [dict()] + else: + self._validateFitParams(params) + paramMaps = params + elif isinstance(params, dict): + paramMaps = [params] + else: + raise ValueError("Params must be either a param map or a list/tuple of param maps, " + "but got %s." % type(params)) + return self._fitInParallel(dataset, paramMaps) + + def _validateParams(self): + """ + Check Param values so we can throw errors on the driver, rather than workers. + :return: True if parameters are valid + """ + if not self.isDefined(self.inputCol): + raise ValueError("Input column must be defined") + if not self.isDefined(self.outputCol): + raise ValueError("Output column must be defined") + return True + + def _fitInParallel(self, dataset, paramMaps): + + inputCol = self.getInputCol() + labelCol = self.getLabelCol() + + from time import gmtime, strftime + kafaParams = self.getKafkaParam() + topic = kafaParams["topic"] + "_" + strftime("%Y-%m-%d-%H-%M-%S", gmtime()) + group_id = kafaParams["group_id"] + bootstrap_servers = kafaParams["bootstrap_servers"] + kafka_test_mode = kafaParams["test_mode"] if "test_mode" in kafaParams else False + + def _write_data(): + def _write_partition(index, d_iter): + producer = KafkaMockServer(index) if kafka_test_mode else KafkaProducer( + bootstrap_servers=bootstrap_servers) + try: + for d in d_iter: + producer.send(topic, pickle.dumps(d)) + producer.send(topic, pickle.dumps("_stop_")) + producer.flush() + finally: + producer.close() + return [] + + dataset.rdd.mapPartitionsWithIndex(_write_partition).count() + + if kafka_test_mode: + _write_data() + else: + t = threading.Thread(target=_write_data) + t.start() + + stop_flag_num = dataset.rdd.getNumPartitions() + temp_item = dataset.take(1)[0] + vocab_s = temp_item["vocab_size"] + embedding_size = temp_item["embedding_size"] + + sc = JVMAPI._curr_sc() + + paramMapsRDD = sc.parallelize(paramMaps, numSlices=len(paramMaps)) + + # Obtain params for this estimator instance + baseParamMap = self.extractParamMap() + baseParamDict = dict([(param.name, val) for param, val in baseParamMap.items()]) + baseParamDictBc = sc.broadcast(baseParamDict) + + def _local_fit(override_param_map): + # Update params + params = baseParamDictBc.value + params["fitParam"] = override_param_map + + def _read_data(max_records=64): + consumer = KafkaMockServer() if kafka_test_mode else KafkaConsumer(topic, + group_id=group_id, + bootstrap_servers=bootstrap_servers, + auto_offset_reset="earliest", + enable_auto_commit=False + ) + try: + stop_count = 0 + fail_msg_count = 0 + while True: + if kafka_test_mode: + time.sleep(1) + messages = consumer.poll(timeout_ms=1000, max_records=max_records) + group_msgs = [] + for tp, records in messages.items(): + for record in records: + try: + msg_value = pickle.loads(record.value) + if msg_value == "_stop_": + stop_count += 1 + else: + group_msgs.append(msg_value) + except: + fail_msg_count += 0 + pass + if len(group_msgs) > 0: + yield group_msgs + + if kafka_test_mode: + print( + "stop_count = {} " + "group_msgs = {} " + "stop_flag_num = {} " + "fail_msg_count = {}".format(stop_count, + len(group_msgs), + stop_flag_num, + fail_msg_count)) + + if stop_count >= stop_flag_num and len(group_msgs) == 0: + break + finally: + consumer.close() + + self.getMapFnParam()(_read_data, + feature=inputCol, + label=labelCol, + vacab_size=vocab_s, + embedding_size=embedding_size, + params=params + ) + + return paramMapsRDD.map(lambda paramMap: (paramMap, _local_fit(paramMap))) + + def _fit(self, dataset): # pylint: disable=unused-argument + err_msgs = ["This function should not have been called", + "Please contact library maintainers to file a bug"] + raise NotImplementedError('\n'.join(err_msgs)) + + +class KafkaMockServer(object): + """ + Restrictions of KafkaMockServer: + * Make sure all data have been writen before consume. + * Poll function will just ignore max_records and just return all data in queue. + """ + _kafka_mock_server_tmp_file_ = "/tmp/mock-kafka/" + sended = False + + def __init__(self, index=0): + super(KafkaMockServer, self).__init__() + self.index = index + self.queue = [] + if not os.path.exists(self._kafka_mock_server_tmp_file_): + os.mkdir(self._kafka_mock_server_tmp_file_) + + def send(self, topic, msg): + self.queue.append(pickle.loads(msg)) + + def flush(self): + with open(self._kafka_mock_server_tmp_file_ + str(self.index), "w") as f: + pickle.dump(self.queue, f) + self.queue = [] + + def close(self): + pass + + def poll(self, timeout_ms, max_records): + if self.sended: + return {} + + records = [] + for file in os.listdir(self._kafka_mock_server_tmp_file_): + with open(self._kafka_mock_server_tmp_file_ + file) as f: + tmp = pickle.load(f) + records += tmp + result = {} + couter = 0 + for i in records: + obj = MockRecord() + obj.value = pickle.dumps(i) + couter += 1 + result[str(couter) + "_"] = [obj] + self.sended = True + return result + + +class MockRecord(list): + pass diff --git a/python/sparkdl/param/shared_params.py b/python/sparkdl/param/shared_params.py index e169e891..7305fc8b 100644 --- a/python/sparkdl/param/shared_params.py +++ b/python/sparkdl/param/shared_params.py @@ -27,6 +27,7 @@ import sparkdl.utils.keras_model as kmutil + # From pyspark def keyword_only(func): @@ -36,15 +37,75 @@ def keyword_only(func): .. note:: Should only be used to wrap a method where first arg is `self` """ + @wraps(func) def wrapper(self, *args, **kwargs): if len(args) > 0: raise TypeError("Method %s forces keyword arguments." % func.__name__) self._input_kwargs = kwargs return func(self, **kwargs) + return wrapper +class KafkaParam(Params): + kafkaParam = Param(Params._dummy(), "kafkaParam", "kafka", typeConverter=TypeConverters.identity) + + def __init__(self): + super(KafkaParam, self).__init__() + + def setKafkaParam(self, value): + """ + Sets the value of :py:attr:`inputCol`. + """ + return self._set(kafkaParam=value) + + def getKafkaParam(self): + """ + Gets the value of inputCol or its default value. + """ + return self.getOrDefault(self.kafkaParam) + + +class FitParam(Params): + fitParam = Param(Params._dummy(), "fitParam", "hyper parameter when training", + typeConverter=TypeConverters.identity) + + def __init__(self): + super(FitParam, self).__init__() + + def setFitParam(self, value): + """ + Sets the value of :py:attr:`inputCol`. + """ + return self._set(fitParam=value) + + def getFitParam(self): + """ + Gets the value of inputCol or its default value. + """ + return self.getOrDefault(self.fitParam) + + +class MapFnParam(Params): + mapFnParam = Param(Params._dummy(), "mapFnParam", "Tensorflow func", typeConverter=TypeConverters.identity) + + def __init__(self): + super(MapFnParam, self).__init__() + + def setMapFnParam(self, value): + """ + Sets the value of :py:attr:`inputCol`. + """ + return self._set(mapFnParam=value) + + def getMapFnParam(self): + """ + Gets the value of inputCol or its default value. + """ + return self.getOrDefault(self.mapFnParam) + + class HasInputCol(Params): """ Mixin for param inputCol: input column name. @@ -68,6 +129,42 @@ def getInputCol(self): return self.getOrDefault(self.inputCol) +class HasEmbeddingSize(Params): + """ + Mixin for param embeddingSize + """ + + embeddingSize = Param(Params._dummy(), "embeddingSize", "word embedding size", + typeConverter=TypeConverters.toInt) + + def __init__(self): + super(HasEmbeddingSize, self).__init__() + + def setEmbeddingSize(self, value): + return self._set(embeddingSize=value) + + def getEmbeddingSize(self): + return self.getOrDefault(self.embeddingSize) + + +class HasSequenceLength(Params): + """ + Mixin for param sequenceLength + """ + + sequenceLength = Param(Params._dummy(), "sequenceLength", "sequence length", + typeConverter=TypeConverters.toInt) + + def __init__(self): + super(HasSequenceLength, self).__init__() + + def setSequenceLength(self, value): + return self._set(sequenceLength=value) + + def getSequenceLength(self): + return self.getOrDefault(self.sequenceLength) + + class HasOutputCol(Params): """ Mixin for param outputCol: output column name. @@ -92,12 +189,12 @@ def getOutputCol(self): """ return self.getOrDefault(self.outputCol) + ############################################ # New in sparkdl ############################################ class SparkDLTypeConverters(object): - @staticmethod def toStringOrTFTensor(value): if isinstance(value, tf.Tensor): diff --git a/python/sparkdl/tf_fun.py b/python/sparkdl/tf_fun.py new file mode 100644 index 00000000..b870f5f8 --- /dev/null +++ b/python/sparkdl/tf_fun.py @@ -0,0 +1,90 @@ +def map_fun(_read_data, **args): + import tensorflow as tf + EMBEDDING_SIZE = args["embedding_size"] + feature = args['feature'] + label = args['label'] + params = args['params']['fitParam'] + SEQUENCE_LENGTH = 64 + + def feed_dict(batch): + # Convert from dict of named arrays to two numpy arrays of the proper type + features = [] + for i in batch: + features.append(i['sentence_matrix']) + + # print("{} {}".format(feature, features)) + return features + + encoder_variables_dict = { + "encoder_w1": tf.Variable( + tf.random_normal([SEQUENCE_LENGTH * EMBEDDING_SIZE, 256]), name="encoder_w1"), + "encoder_b1": tf.Variable(tf.random_normal([256]), name="encoder_b1"), + "encoder_w2": tf.Variable(tf.random_normal([256, 128]), name="encoder_w2"), + "encoder_b2": tf.Variable(tf.random_normal([128]), name="encoder_b2") + } + + def encoder(x, name="encoder"): + with tf.name_scope(name): + encoder_w1 = encoder_variables_dict["encoder_w1"] + encoder_b1 = encoder_variables_dict["encoder_b1"] + + layer_1 = tf.nn.sigmoid(tf.matmul(x, encoder_w1) + encoder_b1) + + encoder_w2 = encoder_variables_dict["encoder_w2"] + encoder_b2 = encoder_variables_dict["encoder_b2"] + + layer_2 = tf.nn.sigmoid(tf.matmul(layer_1, encoder_w2) + encoder_b2) + return layer_2 + + def decoder(x, name="decoder"): + with tf.name_scope(name): + decoder_w1 = tf.Variable(tf.random_normal([128, 256])) + decoder_b1 = tf.Variable(tf.random_normal([256])) + + layer_1 = tf.nn.sigmoid(tf.matmul(x, decoder_w1) + decoder_b1) + + decoder_w2 = tf.Variable( + tf.random_normal([256, SEQUENCE_LENGTH * EMBEDDING_SIZE])) + decoder_b2 = tf.Variable( + tf.random_normal([SEQUENCE_LENGTH * EMBEDDING_SIZE])) + + layer_2 = tf.nn.sigmoid(tf.matmul(layer_1, decoder_w2) + decoder_b2) + return layer_2 + + tf.reset_default_graph + sess = tf.Session() + + input_x = tf.placeholder(tf.float32, [None, SEQUENCE_LENGTH, EMBEDDING_SIZE], name="input_x") + flattened = tf.reshape(input_x, + [-1, SEQUENCE_LENGTH * EMBEDDING_SIZE]) + + encoder_op = encoder(flattened) + + tf.add_to_collection('encoder_op', encoder_op) + + y_pred = decoder(encoder_op) + + y_true = flattened + + with tf.name_scope("xent"): + consine = tf.div(tf.reduce_sum(tf.multiply(y_pred, y_true), 1), + tf.multiply(tf.sqrt(tf.reduce_sum(tf.multiply(y_pred, y_pred), 1)), + tf.sqrt(tf.reduce_sum(tf.multiply(y_true, y_true), 1)))) + xent = tf.reduce_sum(tf.subtract(tf.constant(1.0), consine)) + tf.summary.scalar("xent", xent) + + with tf.name_scope("train"): + # train_step = tf.train.GradientDescentOptimizer(learning_rate).minimize(xent) + train_step = tf.train.RMSPropOptimizer(0.01).minimize(xent) + + summ = tf.summary.merge_all() + + sess.run(tf.global_variables_initializer()) + + for i in range(params["epochs"]): + print("epoll {}".format(i)) + for data in _read_data(max_records=params["batch_size"]): + batch_data = feed_dict(data) + sess.run(train_step, feed_dict={input_x: batch_data}) + + sess.close() diff --git a/python/sparkdl/transformers/named_text.py b/python/sparkdl/transformers/named_text.py new file mode 100644 index 00000000..ef51cd0c --- /dev/null +++ b/python/sparkdl/transformers/named_text.py @@ -0,0 +1,134 @@ +# Copyright 2017 Databricks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from keras.applications.imagenet_utils import decode_predictions +import numpy as np + +from pyspark.ml import Transformer +from pyspark.ml.param import Param, Params, TypeConverters + +import sparkdl.graph.utils as tfx +from sparkdl.image.imageIO import resizeImage +import sparkdl.transformers.keras_applications as keras_apps +from sparkdl.param import ( + keyword_only, HasInputCol, HasOutputCol, SparkDLTypeConverters) +from sparkdl.transformers.tf_text import TFTextTransformer + +SUPPORTED_MODELS = ["CNN", "LSTM"] + + +class DeepTextFeaturizer(Transformer, HasInputCol, HasOutputCol): + """ + todo + """ + modelName = Param(Params._dummy(), "modelName", "A deep learning model name") + + @keyword_only + def __init__(self, inputCol=None, outputCol=None, modelName=None): + """ + __init__(self, inputCol=None, outputCol=None, modelName=None) + """ + super(DeepTextFeaturizer, self).__init__() + kwargs = self._input_kwargs + self.setParams(**kwargs) + + @keyword_only + def setParams(self, inputCol=None, outputCol=None, modelName=None): + """ + setParams(self, inputCol=None, outputCol=None, modelName=None) + """ + kwargs = self._input_kwargs + self._set(**kwargs) + return self + + def setModelName(self, value): + return self._set(modelName=value) + + def getModelName(self): + return self.getOrDefault(self.modelName) + + def _transform(self, dataset): + transformer = _NamedTextTransformer(inputCol=self.getInputCol(), + outputCol=self.getOutputCol(), + modelName=self.getModelName(), featurize=True) + return transformer.transform(dataset) + + +class _NamedTextTransformer(Transformer, HasInputCol, HasOutputCol): + modelName = Param(Params._dummy(), "modelName", "A deep learning model name", + typeConverter=SparkDLTypeConverters.supportedNameConverter(SUPPORTED_MODELS)) + featurize = Param(Params._dummy(), "featurize", + "If true, output features. If false, output predictions. Either way the output is a vector.", + typeConverter=TypeConverters.toBoolean) + + @keyword_only + def __init__(self, inputCol=None, outputCol=None, modelName=None, featurize=False): + """ + __init__(self, inputCol=None, outputCol=None, modelName=None, featurize=False) + """ + super(_NamedTextTransformer, self).__init__() + kwargs = self._input_kwargs + self.setParams(**kwargs) + self._inputTensorName = None + self._outputTensorName = None + self._outputMode = None + + @keyword_only + def setParams(self, inputCol=None, outputCol=None, modelName=None, featurize=False): + """ + setParams(self, inputCol=None, outputCol=None, modelName=None, featurize=False) + """ + kwargs = self._input_kwargs + self._set(**kwargs) + return self + + def setModelName(self, value): + return self._set(modelName=value) + + def getModelName(self): + return self.getOrDefault(self.modelName) + + def setFeaturize(self, value): + return self._set(featurize=value) + + def getFeaturize(self): + return self.getOrDefault(self.featurize) + + def _transform(self, dataset): + modelGraphSpec = _buildTFGraphForName(self.getModelName(), self.getFeaturize()) + inputCol = self.getInputCol() + resizedCol = "__sdl_textResized" + tfTransformer = TFTextTransformer(inputCol=resizedCol, + outputCol=self.getOutputCol(), + graph=modelGraphSpec["graph"], + inputTensor=modelGraphSpec["inputTensorName"], + outputTensor=modelGraphSpec["outputTensorName"], + outputMode=modelGraphSpec["outputMode"]) + resizeUdf = resizeImage(modelGraphSpec["inputTensorSize"]) + result = tfTransformer.transform(dataset.withColumn(resizedCol, resizeUdf(inputCol))) + return result.drop(resizedCol) + + +def _buildTFGraphForName(name, featurize): + """ + Currently only supports pre-trained models from the Keras applications module. + """ + modelData = keras_apps.getKerasApplicationModel(name).getModelData(featurize) + sess = modelData["session"] + outputTensorName = modelData["outputTensorName"] + graph = tfx.strip_and_freeze_until([outputTensorName], sess.graph, sess, return_graph=True) + modelData["graph"] = graph + + return modelData diff --git a/python/sparkdl/transformers/tf_image.py b/python/sparkdl/transformers/tf_image.py index da37fcad..2ca33846 100644 --- a/python/sparkdl/transformers/tf_image.py +++ b/python/sparkdl/transformers/tf_image.py @@ -120,7 +120,7 @@ def _transform(self, dataset): with final_graph.as_default(): image = dataset[self.getInputCol()] image_df_exploded = (dataset - .withColumn("__sdl_image_height", image.height) + .n("__sdl_image_height", image.height) .withColumn("__sdl_image_width", image.width) .withColumn("__sdl_image_nchannels", image.nChannels) .withColumn("__sdl_image_data", image.data) diff --git a/python/sparkdl/transformers/tf_text.py b/python/sparkdl/transformers/tf_text.py new file mode 100644 index 00000000..b040adc0 --- /dev/null +++ b/python/sparkdl/transformers/tf_text.py @@ -0,0 +1,91 @@ +# Copyright 2017 Databricks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import numpy as np +from pyspark.ml import Transformer +from pyspark.ml.feature import Word2Vec +from pyspark.sql.functions import udf +from pyspark.sql import functions as f +from pyspark.sql.types import * +from pyspark.sql.functions import lit +from sparkdl.param.shared_params import HasEmbeddingSize, HasSequenceLength +from sparkdl.param import ( + keyword_only, HasInputCol, HasOutputCol) +import re + +import sparkdl.utils.jvmapi as JVMAPI + + +class TFTextTransformer(Transformer, HasInputCol, HasOutputCol, HasEmbeddingSize, HasSequenceLength): + """ + Convert sentence/document to a 2-D Array eg. [[word embedding],[....]] in DataFrame which can be processed + directly by tensorflow or keras who's backend is tensorflow. + + Processing Steps: + + * Using Word2Vec compute Map(word -> vector) from input column, then broadcast the map. + * Process input column (which is text),split it with white space, replace word with vector, padding the result to + the same size. + * Create a new dataframe with columns like new 2-D array , vocab_size, embedding_size + * return then new dataframe + """ + VOCAB_SIZE = 'vocab_size' + EMBEDDING_SIZE = 'embedding_size' + + @keyword_only + def __init__(self, inputCol=None, outputCol=None, embeddingSize=100, sequenceLength=64): + super(TFTextTransformer, self).__init__() + kwargs = self._input_kwargs + self.setParams(**kwargs) + + @keyword_only + def setParams(self, inputCol=None, outputCol=None, embeddingSize=100, sequenceLength=64): + kwargs = self._input_kwargs + return self._set(**kwargs) + + def _transform(self, dataset): + word2vec = Word2Vec(vectorSize=self.getEmbeddingSize(), minCount=1, inputCol=self.getInputCol(), + outputCol="word_embedding") + word_embedding = dict( + word2vec.fit( + dataset.select(f.split(self.getInputCol(), "\\s+").alias(self.getInputCol()))).getVectors().rdd.map( + lambda p: (p.word, p.vector.values.tolist())).collect()) + word_embedding["unk"] = np.zeros(self.getEmbeddingSize()).tolist() + sc = JVMAPI._curr_sc() + local_word_embedding = sc.broadcast(word_embedding) + + def convert_word_to_index(s): + def _pad_sequences(sequences, maxlen=None): + new_sequences = [] + + if len(sequences) <= maxlen: + for i in range(maxlen - len(sequences)): + new_sequences.append(np.zeros(self.getEmbeddingSize()).tolist()) + return sequences + new_sequences + else: + return sequences[0:maxlen] + + new_q = [local_word_embedding.value[word] for word in re.split(r"\s+", s) if + word in local_word_embedding.value.keys()] + result = _pad_sequences(new_q, maxlen=self.getSequenceLength()) + return result + + cwti_udf = udf(convert_word_to_index, ArrayType(ArrayType(FloatType()))) + doc_martic = (dataset.withColumn(self.getOutputCol(), cwti_udf(self.getInputCol()).alias(self.getOutputCol())) + .withColumn(self.VOCAB_SIZE, lit(len(word_embedding))) + .withColumn(self.EMBEDDING_SIZE, lit(self.getEmbeddingSize())) + ) + + return doc_martic diff --git a/python/sparkdl/transformers/utils.py b/python/sparkdl/transformers/utils.py index b244365b..9964f3df 100644 --- a/python/sparkdl/transformers/utils.py +++ b/python/sparkdl/transformers/utils.py @@ -18,6 +18,8 @@ # image stuff IMAGE_INPUT_PLACEHOLDER_NAME = "sparkdl_image_input" +TEXT_INPUT_PLACEHOLDER_NAME = "sparkdl_text_input" + def imageInputPlaceholder(nChannels=None): return tf.placeholder(tf.float32, [None, None, None, nChannels], diff --git a/python/tests/Test.py b/python/tests/Test.py new file mode 100644 index 00000000..6327cda4 --- /dev/null +++ b/python/tests/Test.py @@ -0,0 +1,30 @@ +import os +os.environ['PYSPARK_PYTHON'] = '/Users/allwefantasy/python2.7/tensorflow/bin/python' + +from sparkdl import readImages +from pyspark.sql.functions import lit +from pyspark.ml.evaluation import MulticlassClassificationEvaluator +from pyspark.ml.classification import LogisticRegression +from pyspark.ml import Pipeline +from sparkdl import DeepImageFeaturizer + +img_dir="/Users/allwefantasy/resources/images/flower_photos" + +tulips_df = readImages(img_dir + "/tulips").withColumn("label", lit(1)) +daisy_df = readImages(img_dir + "/daisy").withColumn("label", lit(0)) + +tulips_train, tulips_test = tulips_df.randomSplit([0.6, 0.4]) +daisy_train, daisy_test = daisy_df.randomSplit([0.6, 0.4]) +train_df = tulips_train.unionAll(daisy_train) +test_df = tulips_test.unionAll(daisy_test) + +featurizer = DeepImageFeaturizer(inputCol="image", outputCol="features", modelName="InceptionV3") +lr = LogisticRegression(maxIter=20, regParam=0.05, elasticNetParam=0.3, labelCol="label") +p = Pipeline(stages=[featurizer, lr]) + +p_model = p.fit(train_df) +tested_df = p_model.transform(test_df) +evaluator = MulticlassClassificationEvaluator(metricName="accuracy") +print("Test set accuracy = " + str(evaluator.evaluate(tested_df.select("prediction", "label")))) + +# h5py,pil \ No newline at end of file diff --git a/python/tests/Test2.py b/python/tests/Test2.py new file mode 100644 index 00000000..b535a602 --- /dev/null +++ b/python/tests/Test2.py @@ -0,0 +1,22 @@ +import os +from pyspark import SparkContext + +from sparkdl.transformers.tf_text import TFTextTransformer + +os.environ['PYSPARK_PYTHON'] = '/Users/allwefantasy/python2.7/tensorflow/bin/python' + +input_col = "text" +output_col = "preds" + +sc = SparkContext.getOrCreate() +documentDF = sc.createDataFrame([ + ("Hi I heard about Spark".split(" "), 1), + ("I wish Java could use case classes".split(" "), 0), + ("Logistic regression models are neat".split(" "), 2) +], ["text", "preds"]) + +transformer = TFTextTransformer( + inputCol=input_col, outputCol=output_col) + +df = transformer.transform(documentDF) +df.show() \ No newline at end of file diff --git a/python/tests/resources/text/sample.txt b/python/tests/resources/text/sample.txt new file mode 100644 index 00000000..8c5e8d99 --- /dev/null +++ b/python/tests/resources/text/sample.txt @@ -0,0 +1,4 @@ +接下 来 介绍 一种 非常 重要 的 神经网络 卷积神经网络 +这种 神经 网络 在 计算机 视觉 领域 取得了 重大 的 成功,而且 在 自然语言 处理 等 其它 领域 也有 很好 应用 +深度学习 受到 大家 关注 很大 一个 原因 就是 Alex 实现 AlexNet( 一种 深度卷积神经网络 )在 LSVRC-2010 ImageNet +此后 卷积神经网络 及其 变种 被广泛 应用于 各种图像 相关 任务 \ No newline at end of file diff --git a/python/tests/transformers/tf_text_test.py b/python/tests/transformers/tf_text_test.py new file mode 100644 index 00000000..0e8b359d --- /dev/null +++ b/python/tests/transformers/tf_text_test.py @@ -0,0 +1,126 @@ +# Copyright 2017 Databricks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import cPickle as pickle +import shutil +import threading + +from sparkdl.estimators.tf_text_file_estimator import TFTextFileEstimator, KafkaMockServer +from sparkdl.transformers.tf_text import TFTextTransformer +from sparkdl.tf_fun import map_fun +from ..tests import SparkDLTestCase + + +class TFTextTransformerTest(SparkDLTestCase): + def test_convertText(self): + input_col = "text" + output_col = "sentence_matrix" + + documentDF = self.session.createDataFrame([ + ("Hi I heard about Spark", 1), + ("I wish Java could use case classes", 0), + ("Logistic regression models are neat", 2) + ], ["text", "preds"]) + + # transform text column to sentence_matrix column which contains 2-D array. + transformer = TFTextTransformer( + inputCol=input_col, outputCol=output_col, embeddingSize=100, sequenceLength=64) + + df = transformer.transform(documentDF) + data = df.collect() + self.assertEquals(len(data), 3) + for row in data: + self.assertEqual(len(row[output_col]), 64) + self.assertEqual(len(row[output_col][0]), 100) + + +class TFTextFileEstimatorTest(SparkDLTestCase): + def test_trainText(self): + import os + if os.path.exists(KafkaMockServer()._kafka_mock_server_tmp_file_): + shutil.rmtree(KafkaMockServer()._kafka_mock_server_tmp_file_) + + input_col = "text" + output_col = "sentence_matrix" + + documentDF = self.session.createDataFrame([ + ("Hi I heard about Spark", 1), + ("I wish Java could use case classes", 0), + ("Logistic regression models are neat", 2) + ], ["text", "preds"]) + + # transform text column to sentence_matrix column which contains 2-D array. + transformer = TFTextTransformer( + inputCol=input_col, outputCol=output_col, embeddingSize=100, sequenceLength=64) + + df = transformer.transform(documentDF) + + # create a estimator to training where map_fun contains tensorflow's code + estimator = TFTextFileEstimator(inputCol="sentence_matrix", outputCol="sentence_matrix", labelCol="preds", + kafkaParam={"bootstrap_servers": ["127.0.0.1"], "topic": "test", + "group_id": "sdl_1", "test_mode": False}, + fitParam=[{"epochs": 5, "batch_size": 64}, {"epochs": 5, "batch_size": 1}], + mapFnParam=map_fun) + estimator.fit(df).collect() + + +class MockKakfaServerTest(SparkDLTestCase): + def test_mockKafkaServerProduce(self): + dataset = self.session.createDataFrame([ + ("Hi I heard about Spark", 1), + ("I wish Java could use case classes", 0), + ("Logistic regression models are neat", 2) + ], ["text", "preds"]) + + def _write_data(): + def _write_partition(index, d_iter): + producer = KafkaMockServer(index) + try: + for d in d_iter: + producer.send("", pickle.dumps(d)) + producer.send("", pickle.dumps("_stop_")) + producer.flush() + finally: + producer.close() + return [] + + dataset.rdd.mapPartitionsWithIndex(_write_partition).count() + + _write_data() + + def _consume(): + consumer = KafkaMockServer() + stop_count = 0 + while True: + messages = consumer.poll(timeout_ms=1000, max_records=64) + group_msgs = [] + for tp, records in messages.items(): + for record in records: + try: + msg_value = pickle.loads(record.value) + print(msg_value) + if msg_value == "_stop_": + stop_count += 1 + else: + group_msgs.append(msg_value) + except: + pass + if stop_count >= 8: + break + self.assertEquals(stop_count, 8) + + t = threading.Thread(target=_consume) + t.start() + t2 = threading.Thread(target=_consume) + t2.start() From 3c3fd2dce20822f86bd112ed58260538980dce40 Mon Sep 17 00:00:00 2001 From: WilliamZhu Date: Fri, 13 Oct 2017 17:28:39 +0800 Subject: [PATCH 02/20] set test_mode to True which can avoid to kafka dependency --- python/tests/transformers/tf_text_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/tests/transformers/tf_text_test.py b/python/tests/transformers/tf_text_test.py index 0e8b359d..26f31d1f 100644 --- a/python/tests/transformers/tf_text_test.py +++ b/python/tests/transformers/tf_text_test.py @@ -69,7 +69,7 @@ def test_trainText(self): # create a estimator to training where map_fun contains tensorflow's code estimator = TFTextFileEstimator(inputCol="sentence_matrix", outputCol="sentence_matrix", labelCol="preds", kafkaParam={"bootstrap_servers": ["127.0.0.1"], "topic": "test", - "group_id": "sdl_1", "test_mode": False}, + "group_id": "sdl_1", "test_mode": True}, fitParam=[{"epochs": 5, "batch_size": 64}, {"epochs": 5, "batch_size": 1}], mapFnParam=map_fun) estimator.fit(df).collect() From e0cdad282a97a0c4541e2f8714bb5ee456b9fd70 Mon Sep 17 00:00:00 2001 From: WilliamZhu Date: Fri, 13 Oct 2017 18:44:23 +0800 Subject: [PATCH 03/20] clean some file --- python/sparkdl/transformers/named_text.py | 134 ---------------------- python/sparkdl/transformers/tf_image.py | 2 +- python/tests/Test.py | 30 ----- python/tests/Test2.py | 22 ---- 4 files changed, 1 insertion(+), 187 deletions(-) delete mode 100644 python/sparkdl/transformers/named_text.py delete mode 100644 python/tests/Test.py delete mode 100644 python/tests/Test2.py diff --git a/python/sparkdl/transformers/named_text.py b/python/sparkdl/transformers/named_text.py deleted file mode 100644 index ef51cd0c..00000000 --- a/python/sparkdl/transformers/named_text.py +++ /dev/null @@ -1,134 +0,0 @@ -# Copyright 2017 Databricks, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from keras.applications.imagenet_utils import decode_predictions -import numpy as np - -from pyspark.ml import Transformer -from pyspark.ml.param import Param, Params, TypeConverters - -import sparkdl.graph.utils as tfx -from sparkdl.image.imageIO import resizeImage -import sparkdl.transformers.keras_applications as keras_apps -from sparkdl.param import ( - keyword_only, HasInputCol, HasOutputCol, SparkDLTypeConverters) -from sparkdl.transformers.tf_text import TFTextTransformer - -SUPPORTED_MODELS = ["CNN", "LSTM"] - - -class DeepTextFeaturizer(Transformer, HasInputCol, HasOutputCol): - """ - todo - """ - modelName = Param(Params._dummy(), "modelName", "A deep learning model name") - - @keyword_only - def __init__(self, inputCol=None, outputCol=None, modelName=None): - """ - __init__(self, inputCol=None, outputCol=None, modelName=None) - """ - super(DeepTextFeaturizer, self).__init__() - kwargs = self._input_kwargs - self.setParams(**kwargs) - - @keyword_only - def setParams(self, inputCol=None, outputCol=None, modelName=None): - """ - setParams(self, inputCol=None, outputCol=None, modelName=None) - """ - kwargs = self._input_kwargs - self._set(**kwargs) - return self - - def setModelName(self, value): - return self._set(modelName=value) - - def getModelName(self): - return self.getOrDefault(self.modelName) - - def _transform(self, dataset): - transformer = _NamedTextTransformer(inputCol=self.getInputCol(), - outputCol=self.getOutputCol(), - modelName=self.getModelName(), featurize=True) - return transformer.transform(dataset) - - -class _NamedTextTransformer(Transformer, HasInputCol, HasOutputCol): - modelName = Param(Params._dummy(), "modelName", "A deep learning model name", - typeConverter=SparkDLTypeConverters.supportedNameConverter(SUPPORTED_MODELS)) - featurize = Param(Params._dummy(), "featurize", - "If true, output features. If false, output predictions. Either way the output is a vector.", - typeConverter=TypeConverters.toBoolean) - - @keyword_only - def __init__(self, inputCol=None, outputCol=None, modelName=None, featurize=False): - """ - __init__(self, inputCol=None, outputCol=None, modelName=None, featurize=False) - """ - super(_NamedTextTransformer, self).__init__() - kwargs = self._input_kwargs - self.setParams(**kwargs) - self._inputTensorName = None - self._outputTensorName = None - self._outputMode = None - - @keyword_only - def setParams(self, inputCol=None, outputCol=None, modelName=None, featurize=False): - """ - setParams(self, inputCol=None, outputCol=None, modelName=None, featurize=False) - """ - kwargs = self._input_kwargs - self._set(**kwargs) - return self - - def setModelName(self, value): - return self._set(modelName=value) - - def getModelName(self): - return self.getOrDefault(self.modelName) - - def setFeaturize(self, value): - return self._set(featurize=value) - - def getFeaturize(self): - return self.getOrDefault(self.featurize) - - def _transform(self, dataset): - modelGraphSpec = _buildTFGraphForName(self.getModelName(), self.getFeaturize()) - inputCol = self.getInputCol() - resizedCol = "__sdl_textResized" - tfTransformer = TFTextTransformer(inputCol=resizedCol, - outputCol=self.getOutputCol(), - graph=modelGraphSpec["graph"], - inputTensor=modelGraphSpec["inputTensorName"], - outputTensor=modelGraphSpec["outputTensorName"], - outputMode=modelGraphSpec["outputMode"]) - resizeUdf = resizeImage(modelGraphSpec["inputTensorSize"]) - result = tfTransformer.transform(dataset.withColumn(resizedCol, resizeUdf(inputCol))) - return result.drop(resizedCol) - - -def _buildTFGraphForName(name, featurize): - """ - Currently only supports pre-trained models from the Keras applications module. - """ - modelData = keras_apps.getKerasApplicationModel(name).getModelData(featurize) - sess = modelData["session"] - outputTensorName = modelData["outputTensorName"] - graph = tfx.strip_and_freeze_until([outputTensorName], sess.graph, sess, return_graph=True) - modelData["graph"] = graph - - return modelData diff --git a/python/sparkdl/transformers/tf_image.py b/python/sparkdl/transformers/tf_image.py index 2ca33846..da37fcad 100644 --- a/python/sparkdl/transformers/tf_image.py +++ b/python/sparkdl/transformers/tf_image.py @@ -120,7 +120,7 @@ def _transform(self, dataset): with final_graph.as_default(): image = dataset[self.getInputCol()] image_df_exploded = (dataset - .n("__sdl_image_height", image.height) + .withColumn("__sdl_image_height", image.height) .withColumn("__sdl_image_width", image.width) .withColumn("__sdl_image_nchannels", image.nChannels) .withColumn("__sdl_image_data", image.data) diff --git a/python/tests/Test.py b/python/tests/Test.py deleted file mode 100644 index 6327cda4..00000000 --- a/python/tests/Test.py +++ /dev/null @@ -1,30 +0,0 @@ -import os -os.environ['PYSPARK_PYTHON'] = '/Users/allwefantasy/python2.7/tensorflow/bin/python' - -from sparkdl import readImages -from pyspark.sql.functions import lit -from pyspark.ml.evaluation import MulticlassClassificationEvaluator -from pyspark.ml.classification import LogisticRegression -from pyspark.ml import Pipeline -from sparkdl import DeepImageFeaturizer - -img_dir="/Users/allwefantasy/resources/images/flower_photos" - -tulips_df = readImages(img_dir + "/tulips").withColumn("label", lit(1)) -daisy_df = readImages(img_dir + "/daisy").withColumn("label", lit(0)) - -tulips_train, tulips_test = tulips_df.randomSplit([0.6, 0.4]) -daisy_train, daisy_test = daisy_df.randomSplit([0.6, 0.4]) -train_df = tulips_train.unionAll(daisy_train) -test_df = tulips_test.unionAll(daisy_test) - -featurizer = DeepImageFeaturizer(inputCol="image", outputCol="features", modelName="InceptionV3") -lr = LogisticRegression(maxIter=20, regParam=0.05, elasticNetParam=0.3, labelCol="label") -p = Pipeline(stages=[featurizer, lr]) - -p_model = p.fit(train_df) -tested_df = p_model.transform(test_df) -evaluator = MulticlassClassificationEvaluator(metricName="accuracy") -print("Test set accuracy = " + str(evaluator.evaluate(tested_df.select("prediction", "label")))) - -# h5py,pil \ No newline at end of file diff --git a/python/tests/Test2.py b/python/tests/Test2.py deleted file mode 100644 index b535a602..00000000 --- a/python/tests/Test2.py +++ /dev/null @@ -1,22 +0,0 @@ -import os -from pyspark import SparkContext - -from sparkdl.transformers.tf_text import TFTextTransformer - -os.environ['PYSPARK_PYTHON'] = '/Users/allwefantasy/python2.7/tensorflow/bin/python' - -input_col = "text" -output_col = "preds" - -sc = SparkContext.getOrCreate() -documentDF = sc.createDataFrame([ - ("Hi I heard about Spark".split(" "), 1), - ("I wish Java could use case classes".split(" "), 0), - ("Logistic regression models are neat".split(" "), 2) -], ["text", "preds"]) - -transformer = TFTextTransformer( - inputCol=input_col, outputCol=output_col) - -df = transformer.transform(documentDF) -df.show() \ No newline at end of file From 4e8b11ed1d686c2bf859be2b8519e8f45d7fa0a6 Mon Sep 17 00:00:00 2001 From: WilliamZhu Date: Wed, 18 Oct 2017 10:25:48 +0800 Subject: [PATCH 04/20] move tensorflow map_fun to tf_text_test.py and modify the signature to support integrating TFoS infuture --- python/requirements.txt | 1 + .../estimators/tf_text_file_estimator.py | 11 +-- python/sparkdl/tf_fun.py | 90 ------------------ python/tests/transformers/tf_text_test.py | 93 ++++++++++++++++++- 4 files changed, 97 insertions(+), 98 deletions(-) delete mode 100644 python/sparkdl/tf_fun.py diff --git a/python/requirements.txt b/python/requirements.txt index a98a4d17..39981df5 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -9,3 +9,4 @@ pygments>=2.2.0 tensorflow==1.3.0 pandas>=0.19.1 six>=1.10.0 +kafka-python>=1.3.5 diff --git a/python/sparkdl/estimators/tf_text_file_estimator.py b/python/sparkdl/estimators/tf_text_file_estimator.py index 278ab8e5..1f2fb116 100644 --- a/python/sparkdl/estimators/tf_text_file_estimator.py +++ b/python/sparkdl/estimators/tf_text_file_estimator.py @@ -245,12 +245,11 @@ def _read_data(max_records=64): finally: consumer.close() - self.getMapFnParam()(_read_data, - feature=inputCol, - label=labelCol, - vacab_size=vocab_s, - embedding_size=embedding_size, - params=params + self.getMapFnParam()(args={"feature": inputCol, + "label": labelCol, + "vacab_size": vocab_s, + "embedding_size": embedding_size, + "params": params}, ctx=None, _read_data=_read_data, ) return paramMapsRDD.map(lambda paramMap: (paramMap, _local_fit(paramMap))) diff --git a/python/sparkdl/tf_fun.py b/python/sparkdl/tf_fun.py deleted file mode 100644 index b870f5f8..00000000 --- a/python/sparkdl/tf_fun.py +++ /dev/null @@ -1,90 +0,0 @@ -def map_fun(_read_data, **args): - import tensorflow as tf - EMBEDDING_SIZE = args["embedding_size"] - feature = args['feature'] - label = args['label'] - params = args['params']['fitParam'] - SEQUENCE_LENGTH = 64 - - def feed_dict(batch): - # Convert from dict of named arrays to two numpy arrays of the proper type - features = [] - for i in batch: - features.append(i['sentence_matrix']) - - # print("{} {}".format(feature, features)) - return features - - encoder_variables_dict = { - "encoder_w1": tf.Variable( - tf.random_normal([SEQUENCE_LENGTH * EMBEDDING_SIZE, 256]), name="encoder_w1"), - "encoder_b1": tf.Variable(tf.random_normal([256]), name="encoder_b1"), - "encoder_w2": tf.Variable(tf.random_normal([256, 128]), name="encoder_w2"), - "encoder_b2": tf.Variable(tf.random_normal([128]), name="encoder_b2") - } - - def encoder(x, name="encoder"): - with tf.name_scope(name): - encoder_w1 = encoder_variables_dict["encoder_w1"] - encoder_b1 = encoder_variables_dict["encoder_b1"] - - layer_1 = tf.nn.sigmoid(tf.matmul(x, encoder_w1) + encoder_b1) - - encoder_w2 = encoder_variables_dict["encoder_w2"] - encoder_b2 = encoder_variables_dict["encoder_b2"] - - layer_2 = tf.nn.sigmoid(tf.matmul(layer_1, encoder_w2) + encoder_b2) - return layer_2 - - def decoder(x, name="decoder"): - with tf.name_scope(name): - decoder_w1 = tf.Variable(tf.random_normal([128, 256])) - decoder_b1 = tf.Variable(tf.random_normal([256])) - - layer_1 = tf.nn.sigmoid(tf.matmul(x, decoder_w1) + decoder_b1) - - decoder_w2 = tf.Variable( - tf.random_normal([256, SEQUENCE_LENGTH * EMBEDDING_SIZE])) - decoder_b2 = tf.Variable( - tf.random_normal([SEQUENCE_LENGTH * EMBEDDING_SIZE])) - - layer_2 = tf.nn.sigmoid(tf.matmul(layer_1, decoder_w2) + decoder_b2) - return layer_2 - - tf.reset_default_graph - sess = tf.Session() - - input_x = tf.placeholder(tf.float32, [None, SEQUENCE_LENGTH, EMBEDDING_SIZE], name="input_x") - flattened = tf.reshape(input_x, - [-1, SEQUENCE_LENGTH * EMBEDDING_SIZE]) - - encoder_op = encoder(flattened) - - tf.add_to_collection('encoder_op', encoder_op) - - y_pred = decoder(encoder_op) - - y_true = flattened - - with tf.name_scope("xent"): - consine = tf.div(tf.reduce_sum(tf.multiply(y_pred, y_true), 1), - tf.multiply(tf.sqrt(tf.reduce_sum(tf.multiply(y_pred, y_pred), 1)), - tf.sqrt(tf.reduce_sum(tf.multiply(y_true, y_true), 1)))) - xent = tf.reduce_sum(tf.subtract(tf.constant(1.0), consine)) - tf.summary.scalar("xent", xent) - - with tf.name_scope("train"): - # train_step = tf.train.GradientDescentOptimizer(learning_rate).minimize(xent) - train_step = tf.train.RMSPropOptimizer(0.01).minimize(xent) - - summ = tf.summary.merge_all() - - sess.run(tf.global_variables_initializer()) - - for i in range(params["epochs"]): - print("epoll {}".format(i)) - for data in _read_data(max_records=params["batch_size"]): - batch_data = feed_dict(data) - sess.run(train_step, feed_dict={input_x: batch_data}) - - sess.close() diff --git a/python/tests/transformers/tf_text_test.py b/python/tests/transformers/tf_text_test.py index 26f31d1f..25cb1e0d 100644 --- a/python/tests/transformers/tf_text_test.py +++ b/python/tests/transformers/tf_text_test.py @@ -18,10 +18,99 @@ from sparkdl.estimators.tf_text_file_estimator import TFTextFileEstimator, KafkaMockServer from sparkdl.transformers.tf_text import TFTextTransformer -from sparkdl.tf_fun import map_fun from ..tests import SparkDLTestCase +def map_fun(args={}, ctx=None, _read_data=None): + import tensorflow as tf + EMBEDDING_SIZE = args["embedding_size"] + params = args['params']['fitParam'] + SEQUENCE_LENGTH = 64 + + def feed_dict(batch): + # Convert from dict of named arrays to two numpy arrays of the proper type + features = [] + for i in batch: + features.append(i['sentence_matrix']) + + # print("{} {}".format(feature, features)) + return features + + encoder_variables_dict = { + "encoder_w1": tf.Variable( + tf.random_normal([SEQUENCE_LENGTH * EMBEDDING_SIZE, 256]), name="encoder_w1"), + "encoder_b1": tf.Variable(tf.random_normal([256]), name="encoder_b1"), + "encoder_w2": tf.Variable(tf.random_normal([256, 128]), name="encoder_w2"), + "encoder_b2": tf.Variable(tf.random_normal([128]), name="encoder_b2") + } + + def encoder(x, name="encoder"): + with tf.name_scope(name): + encoder_w1 = encoder_variables_dict["encoder_w1"] + encoder_b1 = encoder_variables_dict["encoder_b1"] + + layer_1 = tf.nn.sigmoid(tf.matmul(x, encoder_w1) + encoder_b1) + + encoder_w2 = encoder_variables_dict["encoder_w2"] + encoder_b2 = encoder_variables_dict["encoder_b2"] + + layer_2 = tf.nn.sigmoid(tf.matmul(layer_1, encoder_w2) + encoder_b2) + return layer_2 + + def decoder(x, name="decoder"): + with tf.name_scope(name): + decoder_w1 = tf.Variable(tf.random_normal([128, 256])) + decoder_b1 = tf.Variable(tf.random_normal([256])) + + layer_1 = tf.nn.sigmoid(tf.matmul(x, decoder_w1) + decoder_b1) + + decoder_w2 = tf.Variable( + tf.random_normal([256, SEQUENCE_LENGTH * EMBEDDING_SIZE])) + decoder_b2 = tf.Variable( + tf.random_normal([SEQUENCE_LENGTH * EMBEDDING_SIZE])) + + layer_2 = tf.nn.sigmoid(tf.matmul(layer_1, decoder_w2) + decoder_b2) + return layer_2 + + tf.reset_default_graph + sess = tf.Session() + + input_x = tf.placeholder(tf.float32, [None, SEQUENCE_LENGTH, EMBEDDING_SIZE], name="input_x") + flattened = tf.reshape(input_x, + [-1, SEQUENCE_LENGTH * EMBEDDING_SIZE]) + + encoder_op = encoder(flattened) + + tf.add_to_collection('encoder_op', encoder_op) + + y_pred = decoder(encoder_op) + + y_true = flattened + + with tf.name_scope("xent"): + consine = tf.div(tf.reduce_sum(tf.multiply(y_pred, y_true), 1), + tf.multiply(tf.sqrt(tf.reduce_sum(tf.multiply(y_pred, y_pred), 1)), + tf.sqrt(tf.reduce_sum(tf.multiply(y_true, y_true), 1)))) + xent = tf.reduce_sum(tf.subtract(tf.constant(1.0), consine)) + tf.summary.scalar("xent", xent) + + with tf.name_scope("train"): + # train_step = tf.train.GradientDescentOptimizer(learning_rate).minimize(xent) + train_step = tf.train.RMSPropOptimizer(0.01).minimize(xent) + + summ = tf.summary.merge_all() + + sess.run(tf.global_variables_initializer()) + + for i in range(params["epochs"]): + print("epoll {}".format(i)) + for data in _read_data(max_records=params["batch_size"]): + batch_data = feed_dict(data) + sess.run(train_step, feed_dict={input_x: batch_data}) + + sess.close() + + class TFTextTransformerTest(SparkDLTestCase): def test_convertText(self): input_col = "text" @@ -48,7 +137,7 @@ def test_convertText(self): class TFTextFileEstimatorTest(SparkDLTestCase): def test_trainText(self): import os - if os.path.exists(KafkaMockServer()._kafka_mock_server_tmp_file_): + if os.path.exists(KafkaMockServer()._kafka_mock_server_tmp_file_): shutil.rmtree(KafkaMockServer()._kafka_mock_server_tmp_file_) input_col = "text" From 15a0c400df7586516a6e03c652279ab80d011f0f Mon Sep 17 00:00:00 2001 From: WilliamZhu Date: Wed, 11 Oct 2017 16:11:24 +0800 Subject: [PATCH 05/20] 1. Support NLP non-distribued training 2. Introduce Kafka to avoid broadcast huge tranning data --- .../estimators/tf_text_file_estimator.py | 312 ++++++++++++++++++ python/sparkdl/param/shared_params.py | 99 +++++- python/sparkdl/tf_fun.py | 90 +++++ python/sparkdl/transformers/named_text.py | 134 ++++++++ python/sparkdl/transformers/tf_image.py | 2 +- python/sparkdl/transformers/tf_text.py | 91 +++++ python/sparkdl/transformers/utils.py | 2 + python/tests/Test.py | 30 ++ python/tests/Test2.py | 22 ++ python/tests/resources/text/sample.txt | 4 + python/tests/transformers/tf_text_test.py | 126 +++++++ 11 files changed, 910 insertions(+), 2 deletions(-) create mode 100644 python/sparkdl/estimators/tf_text_file_estimator.py create mode 100644 python/sparkdl/tf_fun.py create mode 100644 python/sparkdl/transformers/named_text.py create mode 100644 python/sparkdl/transformers/tf_text.py create mode 100644 python/tests/Test.py create mode 100644 python/tests/Test2.py create mode 100644 python/tests/resources/text/sample.txt create mode 100644 python/tests/transformers/tf_text_test.py diff --git a/python/sparkdl/estimators/tf_text_file_estimator.py b/python/sparkdl/estimators/tf_text_file_estimator.py new file mode 100644 index 00000000..278ab8e5 --- /dev/null +++ b/python/sparkdl/estimators/tf_text_file_estimator.py @@ -0,0 +1,312 @@ +# +# Copyright 2017 Databricks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pylint: disable=protected-access +from __future__ import absolute_import, division, print_function + +import logging +import threading +import time +import os +import shutil + +import cPickle as pickle + +from kafka import KafkaConsumer +from kafka import KafkaProducer +from pyspark.ml import Estimator + +from sparkdl.param import ( + keyword_only, HasLabelCol, HasInputCol, HasOutputCol) +from sparkdl.param.shared_params import KafkaParam, FitParam, MapFnParam +import sparkdl.utils.jvmapi as JVMAPI + +__all__ = ['TFTextFileEstimator'] + +logger = logging.getLogger('sparkdl') + + +class TFTextFileEstimator(Estimator, HasInputCol, HasOutputCol, HasLabelCol, KafkaParam, FitParam, MapFnParam): + """ + Build a Estimator from tensorflow or keras when backend is tensorflow. + + First,assume we have data in dataframe like following. + + .. code-block:: python + documentDF = self.session.createDataFrame([ + ("Hi I heard about Spark", 1), + ("I wish Java could use case classes", 0), + ("Logistic regression models are neat", 2) + ], ["text", "preds"]) + + transformer = TFTextTransformer( + inputCol=input_col, + outputCol=output_col) + + df = transformer.transform(documentDF) + + TFTextTransformer will transform text column to `output_col`, which is 2-D array. + + Then we create a tensorflow function. + + .. code-block:: python + def map_fun(_read_data, **args): + import tensorflow as tf + EMBEDDING_SIZE = args["embedding_size"] + feature = args['feature'] + label = args['label'] + params = args['params']['fitParam'] + SEQUENCE_LENGTH = 64 + + def feed_dict(batch): + # Convert from dict of named arrays to two numpy arrays of the proper type + features = [] + for i in batch: + features.append(i['sentence_matrix']) + + # print("{} {}".format(feature, features)) + return features + + encoder_variables_dict = { + "encoder_w1": tf.Variable( + tf.random_normal([SEQUENCE_LENGTH * EMBEDDING_SIZE, 256]), name="encoder_w1"), + "encoder_b1": tf.Variable(tf.random_normal([256]), name="encoder_b1"), + "encoder_w2": tf.Variable(tf.random_normal([256, 128]), name="encoder_w2"), + "encoder_b2": tf.Variable(tf.random_normal([128]), name="encoder_b2") + } + + _read_data is a data generator. args provide hyper parameteres configured in this estimator. + + here is how to use _read_data: + + .. code-block:: python + for data in _read_data(max_records=params.batch_size): + batch_data = feed_dict(data) + sess.run(train_step, feed_dict={input_x: batch_data}) + + finally we can create TFTextFileEstimator to train our model: + + .. code-block:: python + estimator = TFTextFileEstimator(inputCol="sentence_matrix", + outputCol="sentence_matrix", labelCol="preds", + kafkaParam={"bootstrap_servers": ["127.0.0.1"], "topic": "test", + "group_id": "sdl_1"}, + fitParam=[{"epochs": 5, "batch_size": 64}, {"epochs": 5, "batch_size": 1}], + mapFnParam=map_fun) + estimator.fit(df) + + """ + + @keyword_only + def __init__(self, inputCol=None, outputCol=None, labelCol=None, kafkaParam=None, fitParam=None, mapFnParam=None): + super(TFTextFileEstimator, self).__init__() + kwargs = self._input_kwargs + self.setParams(**kwargs) + + @keyword_only + def setParams(self, inputCol=None, outputCol=None, labelCol=None, kafkaParam=None, fitParam=None, mapFnParam=None): + kwargs = self._input_kwargs + return self._set(**kwargs) + + def fit(self, dataset, params=None): + self._validateParams() + if params is None: + paramMaps = self.getFitParam() + elif isinstance(params, (list, tuple)): + if len(params) == 0: + paramMaps = [dict()] + else: + self._validateFitParams(params) + paramMaps = params + elif isinstance(params, dict): + paramMaps = [params] + else: + raise ValueError("Params must be either a param map or a list/tuple of param maps, " + "but got %s." % type(params)) + return self._fitInParallel(dataset, paramMaps) + + def _validateParams(self): + """ + Check Param values so we can throw errors on the driver, rather than workers. + :return: True if parameters are valid + """ + if not self.isDefined(self.inputCol): + raise ValueError("Input column must be defined") + if not self.isDefined(self.outputCol): + raise ValueError("Output column must be defined") + return True + + def _fitInParallel(self, dataset, paramMaps): + + inputCol = self.getInputCol() + labelCol = self.getLabelCol() + + from time import gmtime, strftime + kafaParams = self.getKafkaParam() + topic = kafaParams["topic"] + "_" + strftime("%Y-%m-%d-%H-%M-%S", gmtime()) + group_id = kafaParams["group_id"] + bootstrap_servers = kafaParams["bootstrap_servers"] + kafka_test_mode = kafaParams["test_mode"] if "test_mode" in kafaParams else False + + def _write_data(): + def _write_partition(index, d_iter): + producer = KafkaMockServer(index) if kafka_test_mode else KafkaProducer( + bootstrap_servers=bootstrap_servers) + try: + for d in d_iter: + producer.send(topic, pickle.dumps(d)) + producer.send(topic, pickle.dumps("_stop_")) + producer.flush() + finally: + producer.close() + return [] + + dataset.rdd.mapPartitionsWithIndex(_write_partition).count() + + if kafka_test_mode: + _write_data() + else: + t = threading.Thread(target=_write_data) + t.start() + + stop_flag_num = dataset.rdd.getNumPartitions() + temp_item = dataset.take(1)[0] + vocab_s = temp_item["vocab_size"] + embedding_size = temp_item["embedding_size"] + + sc = JVMAPI._curr_sc() + + paramMapsRDD = sc.parallelize(paramMaps, numSlices=len(paramMaps)) + + # Obtain params for this estimator instance + baseParamMap = self.extractParamMap() + baseParamDict = dict([(param.name, val) for param, val in baseParamMap.items()]) + baseParamDictBc = sc.broadcast(baseParamDict) + + def _local_fit(override_param_map): + # Update params + params = baseParamDictBc.value + params["fitParam"] = override_param_map + + def _read_data(max_records=64): + consumer = KafkaMockServer() if kafka_test_mode else KafkaConsumer(topic, + group_id=group_id, + bootstrap_servers=bootstrap_servers, + auto_offset_reset="earliest", + enable_auto_commit=False + ) + try: + stop_count = 0 + fail_msg_count = 0 + while True: + if kafka_test_mode: + time.sleep(1) + messages = consumer.poll(timeout_ms=1000, max_records=max_records) + group_msgs = [] + for tp, records in messages.items(): + for record in records: + try: + msg_value = pickle.loads(record.value) + if msg_value == "_stop_": + stop_count += 1 + else: + group_msgs.append(msg_value) + except: + fail_msg_count += 0 + pass + if len(group_msgs) > 0: + yield group_msgs + + if kafka_test_mode: + print( + "stop_count = {} " + "group_msgs = {} " + "stop_flag_num = {} " + "fail_msg_count = {}".format(stop_count, + len(group_msgs), + stop_flag_num, + fail_msg_count)) + + if stop_count >= stop_flag_num and len(group_msgs) == 0: + break + finally: + consumer.close() + + self.getMapFnParam()(_read_data, + feature=inputCol, + label=labelCol, + vacab_size=vocab_s, + embedding_size=embedding_size, + params=params + ) + + return paramMapsRDD.map(lambda paramMap: (paramMap, _local_fit(paramMap))) + + def _fit(self, dataset): # pylint: disable=unused-argument + err_msgs = ["This function should not have been called", + "Please contact library maintainers to file a bug"] + raise NotImplementedError('\n'.join(err_msgs)) + + +class KafkaMockServer(object): + """ + Restrictions of KafkaMockServer: + * Make sure all data have been writen before consume. + * Poll function will just ignore max_records and just return all data in queue. + """ + _kafka_mock_server_tmp_file_ = "/tmp/mock-kafka/" + sended = False + + def __init__(self, index=0): + super(KafkaMockServer, self).__init__() + self.index = index + self.queue = [] + if not os.path.exists(self._kafka_mock_server_tmp_file_): + os.mkdir(self._kafka_mock_server_tmp_file_) + + def send(self, topic, msg): + self.queue.append(pickle.loads(msg)) + + def flush(self): + with open(self._kafka_mock_server_tmp_file_ + str(self.index), "w") as f: + pickle.dump(self.queue, f) + self.queue = [] + + def close(self): + pass + + def poll(self, timeout_ms, max_records): + if self.sended: + return {} + + records = [] + for file in os.listdir(self._kafka_mock_server_tmp_file_): + with open(self._kafka_mock_server_tmp_file_ + file) as f: + tmp = pickle.load(f) + records += tmp + result = {} + couter = 0 + for i in records: + obj = MockRecord() + obj.value = pickle.dumps(i) + couter += 1 + result[str(couter) + "_"] = [obj] + self.sended = True + return result + + +class MockRecord(list): + pass diff --git a/python/sparkdl/param/shared_params.py b/python/sparkdl/param/shared_params.py index e169e891..7305fc8b 100644 --- a/python/sparkdl/param/shared_params.py +++ b/python/sparkdl/param/shared_params.py @@ -27,6 +27,7 @@ import sparkdl.utils.keras_model as kmutil + # From pyspark def keyword_only(func): @@ -36,15 +37,75 @@ def keyword_only(func): .. note:: Should only be used to wrap a method where first arg is `self` """ + @wraps(func) def wrapper(self, *args, **kwargs): if len(args) > 0: raise TypeError("Method %s forces keyword arguments." % func.__name__) self._input_kwargs = kwargs return func(self, **kwargs) + return wrapper +class KafkaParam(Params): + kafkaParam = Param(Params._dummy(), "kafkaParam", "kafka", typeConverter=TypeConverters.identity) + + def __init__(self): + super(KafkaParam, self).__init__() + + def setKafkaParam(self, value): + """ + Sets the value of :py:attr:`inputCol`. + """ + return self._set(kafkaParam=value) + + def getKafkaParam(self): + """ + Gets the value of inputCol or its default value. + """ + return self.getOrDefault(self.kafkaParam) + + +class FitParam(Params): + fitParam = Param(Params._dummy(), "fitParam", "hyper parameter when training", + typeConverter=TypeConverters.identity) + + def __init__(self): + super(FitParam, self).__init__() + + def setFitParam(self, value): + """ + Sets the value of :py:attr:`inputCol`. + """ + return self._set(fitParam=value) + + def getFitParam(self): + """ + Gets the value of inputCol or its default value. + """ + return self.getOrDefault(self.fitParam) + + +class MapFnParam(Params): + mapFnParam = Param(Params._dummy(), "mapFnParam", "Tensorflow func", typeConverter=TypeConverters.identity) + + def __init__(self): + super(MapFnParam, self).__init__() + + def setMapFnParam(self, value): + """ + Sets the value of :py:attr:`inputCol`. + """ + return self._set(mapFnParam=value) + + def getMapFnParam(self): + """ + Gets the value of inputCol or its default value. + """ + return self.getOrDefault(self.mapFnParam) + + class HasInputCol(Params): """ Mixin for param inputCol: input column name. @@ -68,6 +129,42 @@ def getInputCol(self): return self.getOrDefault(self.inputCol) +class HasEmbeddingSize(Params): + """ + Mixin for param embeddingSize + """ + + embeddingSize = Param(Params._dummy(), "embeddingSize", "word embedding size", + typeConverter=TypeConverters.toInt) + + def __init__(self): + super(HasEmbeddingSize, self).__init__() + + def setEmbeddingSize(self, value): + return self._set(embeddingSize=value) + + def getEmbeddingSize(self): + return self.getOrDefault(self.embeddingSize) + + +class HasSequenceLength(Params): + """ + Mixin for param sequenceLength + """ + + sequenceLength = Param(Params._dummy(), "sequenceLength", "sequence length", + typeConverter=TypeConverters.toInt) + + def __init__(self): + super(HasSequenceLength, self).__init__() + + def setSequenceLength(self, value): + return self._set(sequenceLength=value) + + def getSequenceLength(self): + return self.getOrDefault(self.sequenceLength) + + class HasOutputCol(Params): """ Mixin for param outputCol: output column name. @@ -92,12 +189,12 @@ def getOutputCol(self): """ return self.getOrDefault(self.outputCol) + ############################################ # New in sparkdl ############################################ class SparkDLTypeConverters(object): - @staticmethod def toStringOrTFTensor(value): if isinstance(value, tf.Tensor): diff --git a/python/sparkdl/tf_fun.py b/python/sparkdl/tf_fun.py new file mode 100644 index 00000000..b870f5f8 --- /dev/null +++ b/python/sparkdl/tf_fun.py @@ -0,0 +1,90 @@ +def map_fun(_read_data, **args): + import tensorflow as tf + EMBEDDING_SIZE = args["embedding_size"] + feature = args['feature'] + label = args['label'] + params = args['params']['fitParam'] + SEQUENCE_LENGTH = 64 + + def feed_dict(batch): + # Convert from dict of named arrays to two numpy arrays of the proper type + features = [] + for i in batch: + features.append(i['sentence_matrix']) + + # print("{} {}".format(feature, features)) + return features + + encoder_variables_dict = { + "encoder_w1": tf.Variable( + tf.random_normal([SEQUENCE_LENGTH * EMBEDDING_SIZE, 256]), name="encoder_w1"), + "encoder_b1": tf.Variable(tf.random_normal([256]), name="encoder_b1"), + "encoder_w2": tf.Variable(tf.random_normal([256, 128]), name="encoder_w2"), + "encoder_b2": tf.Variable(tf.random_normal([128]), name="encoder_b2") + } + + def encoder(x, name="encoder"): + with tf.name_scope(name): + encoder_w1 = encoder_variables_dict["encoder_w1"] + encoder_b1 = encoder_variables_dict["encoder_b1"] + + layer_1 = tf.nn.sigmoid(tf.matmul(x, encoder_w1) + encoder_b1) + + encoder_w2 = encoder_variables_dict["encoder_w2"] + encoder_b2 = encoder_variables_dict["encoder_b2"] + + layer_2 = tf.nn.sigmoid(tf.matmul(layer_1, encoder_w2) + encoder_b2) + return layer_2 + + def decoder(x, name="decoder"): + with tf.name_scope(name): + decoder_w1 = tf.Variable(tf.random_normal([128, 256])) + decoder_b1 = tf.Variable(tf.random_normal([256])) + + layer_1 = tf.nn.sigmoid(tf.matmul(x, decoder_w1) + decoder_b1) + + decoder_w2 = tf.Variable( + tf.random_normal([256, SEQUENCE_LENGTH * EMBEDDING_SIZE])) + decoder_b2 = tf.Variable( + tf.random_normal([SEQUENCE_LENGTH * EMBEDDING_SIZE])) + + layer_2 = tf.nn.sigmoid(tf.matmul(layer_1, decoder_w2) + decoder_b2) + return layer_2 + + tf.reset_default_graph + sess = tf.Session() + + input_x = tf.placeholder(tf.float32, [None, SEQUENCE_LENGTH, EMBEDDING_SIZE], name="input_x") + flattened = tf.reshape(input_x, + [-1, SEQUENCE_LENGTH * EMBEDDING_SIZE]) + + encoder_op = encoder(flattened) + + tf.add_to_collection('encoder_op', encoder_op) + + y_pred = decoder(encoder_op) + + y_true = flattened + + with tf.name_scope("xent"): + consine = tf.div(tf.reduce_sum(tf.multiply(y_pred, y_true), 1), + tf.multiply(tf.sqrt(tf.reduce_sum(tf.multiply(y_pred, y_pred), 1)), + tf.sqrt(tf.reduce_sum(tf.multiply(y_true, y_true), 1)))) + xent = tf.reduce_sum(tf.subtract(tf.constant(1.0), consine)) + tf.summary.scalar("xent", xent) + + with tf.name_scope("train"): + # train_step = tf.train.GradientDescentOptimizer(learning_rate).minimize(xent) + train_step = tf.train.RMSPropOptimizer(0.01).minimize(xent) + + summ = tf.summary.merge_all() + + sess.run(tf.global_variables_initializer()) + + for i in range(params["epochs"]): + print("epoll {}".format(i)) + for data in _read_data(max_records=params["batch_size"]): + batch_data = feed_dict(data) + sess.run(train_step, feed_dict={input_x: batch_data}) + + sess.close() diff --git a/python/sparkdl/transformers/named_text.py b/python/sparkdl/transformers/named_text.py new file mode 100644 index 00000000..ef51cd0c --- /dev/null +++ b/python/sparkdl/transformers/named_text.py @@ -0,0 +1,134 @@ +# Copyright 2017 Databricks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from keras.applications.imagenet_utils import decode_predictions +import numpy as np + +from pyspark.ml import Transformer +from pyspark.ml.param import Param, Params, TypeConverters + +import sparkdl.graph.utils as tfx +from sparkdl.image.imageIO import resizeImage +import sparkdl.transformers.keras_applications as keras_apps +from sparkdl.param import ( + keyword_only, HasInputCol, HasOutputCol, SparkDLTypeConverters) +from sparkdl.transformers.tf_text import TFTextTransformer + +SUPPORTED_MODELS = ["CNN", "LSTM"] + + +class DeepTextFeaturizer(Transformer, HasInputCol, HasOutputCol): + """ + todo + """ + modelName = Param(Params._dummy(), "modelName", "A deep learning model name") + + @keyword_only + def __init__(self, inputCol=None, outputCol=None, modelName=None): + """ + __init__(self, inputCol=None, outputCol=None, modelName=None) + """ + super(DeepTextFeaturizer, self).__init__() + kwargs = self._input_kwargs + self.setParams(**kwargs) + + @keyword_only + def setParams(self, inputCol=None, outputCol=None, modelName=None): + """ + setParams(self, inputCol=None, outputCol=None, modelName=None) + """ + kwargs = self._input_kwargs + self._set(**kwargs) + return self + + def setModelName(self, value): + return self._set(modelName=value) + + def getModelName(self): + return self.getOrDefault(self.modelName) + + def _transform(self, dataset): + transformer = _NamedTextTransformer(inputCol=self.getInputCol(), + outputCol=self.getOutputCol(), + modelName=self.getModelName(), featurize=True) + return transformer.transform(dataset) + + +class _NamedTextTransformer(Transformer, HasInputCol, HasOutputCol): + modelName = Param(Params._dummy(), "modelName", "A deep learning model name", + typeConverter=SparkDLTypeConverters.supportedNameConverter(SUPPORTED_MODELS)) + featurize = Param(Params._dummy(), "featurize", + "If true, output features. If false, output predictions. Either way the output is a vector.", + typeConverter=TypeConverters.toBoolean) + + @keyword_only + def __init__(self, inputCol=None, outputCol=None, modelName=None, featurize=False): + """ + __init__(self, inputCol=None, outputCol=None, modelName=None, featurize=False) + """ + super(_NamedTextTransformer, self).__init__() + kwargs = self._input_kwargs + self.setParams(**kwargs) + self._inputTensorName = None + self._outputTensorName = None + self._outputMode = None + + @keyword_only + def setParams(self, inputCol=None, outputCol=None, modelName=None, featurize=False): + """ + setParams(self, inputCol=None, outputCol=None, modelName=None, featurize=False) + """ + kwargs = self._input_kwargs + self._set(**kwargs) + return self + + def setModelName(self, value): + return self._set(modelName=value) + + def getModelName(self): + return self.getOrDefault(self.modelName) + + def setFeaturize(self, value): + return self._set(featurize=value) + + def getFeaturize(self): + return self.getOrDefault(self.featurize) + + def _transform(self, dataset): + modelGraphSpec = _buildTFGraphForName(self.getModelName(), self.getFeaturize()) + inputCol = self.getInputCol() + resizedCol = "__sdl_textResized" + tfTransformer = TFTextTransformer(inputCol=resizedCol, + outputCol=self.getOutputCol(), + graph=modelGraphSpec["graph"], + inputTensor=modelGraphSpec["inputTensorName"], + outputTensor=modelGraphSpec["outputTensorName"], + outputMode=modelGraphSpec["outputMode"]) + resizeUdf = resizeImage(modelGraphSpec["inputTensorSize"]) + result = tfTransformer.transform(dataset.withColumn(resizedCol, resizeUdf(inputCol))) + return result.drop(resizedCol) + + +def _buildTFGraphForName(name, featurize): + """ + Currently only supports pre-trained models from the Keras applications module. + """ + modelData = keras_apps.getKerasApplicationModel(name).getModelData(featurize) + sess = modelData["session"] + outputTensorName = modelData["outputTensorName"] + graph = tfx.strip_and_freeze_until([outputTensorName], sess.graph, sess, return_graph=True) + modelData["graph"] = graph + + return modelData diff --git a/python/sparkdl/transformers/tf_image.py b/python/sparkdl/transformers/tf_image.py index da37fcad..2ca33846 100644 --- a/python/sparkdl/transformers/tf_image.py +++ b/python/sparkdl/transformers/tf_image.py @@ -120,7 +120,7 @@ def _transform(self, dataset): with final_graph.as_default(): image = dataset[self.getInputCol()] image_df_exploded = (dataset - .withColumn("__sdl_image_height", image.height) + .n("__sdl_image_height", image.height) .withColumn("__sdl_image_width", image.width) .withColumn("__sdl_image_nchannels", image.nChannels) .withColumn("__sdl_image_data", image.data) diff --git a/python/sparkdl/transformers/tf_text.py b/python/sparkdl/transformers/tf_text.py new file mode 100644 index 00000000..b040adc0 --- /dev/null +++ b/python/sparkdl/transformers/tf_text.py @@ -0,0 +1,91 @@ +# Copyright 2017 Databricks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import numpy as np +from pyspark.ml import Transformer +from pyspark.ml.feature import Word2Vec +from pyspark.sql.functions import udf +from pyspark.sql import functions as f +from pyspark.sql.types import * +from pyspark.sql.functions import lit +from sparkdl.param.shared_params import HasEmbeddingSize, HasSequenceLength +from sparkdl.param import ( + keyword_only, HasInputCol, HasOutputCol) +import re + +import sparkdl.utils.jvmapi as JVMAPI + + +class TFTextTransformer(Transformer, HasInputCol, HasOutputCol, HasEmbeddingSize, HasSequenceLength): + """ + Convert sentence/document to a 2-D Array eg. [[word embedding],[....]] in DataFrame which can be processed + directly by tensorflow or keras who's backend is tensorflow. + + Processing Steps: + + * Using Word2Vec compute Map(word -> vector) from input column, then broadcast the map. + * Process input column (which is text),split it with white space, replace word with vector, padding the result to + the same size. + * Create a new dataframe with columns like new 2-D array , vocab_size, embedding_size + * return then new dataframe + """ + VOCAB_SIZE = 'vocab_size' + EMBEDDING_SIZE = 'embedding_size' + + @keyword_only + def __init__(self, inputCol=None, outputCol=None, embeddingSize=100, sequenceLength=64): + super(TFTextTransformer, self).__init__() + kwargs = self._input_kwargs + self.setParams(**kwargs) + + @keyword_only + def setParams(self, inputCol=None, outputCol=None, embeddingSize=100, sequenceLength=64): + kwargs = self._input_kwargs + return self._set(**kwargs) + + def _transform(self, dataset): + word2vec = Word2Vec(vectorSize=self.getEmbeddingSize(), minCount=1, inputCol=self.getInputCol(), + outputCol="word_embedding") + word_embedding = dict( + word2vec.fit( + dataset.select(f.split(self.getInputCol(), "\\s+").alias(self.getInputCol()))).getVectors().rdd.map( + lambda p: (p.word, p.vector.values.tolist())).collect()) + word_embedding["unk"] = np.zeros(self.getEmbeddingSize()).tolist() + sc = JVMAPI._curr_sc() + local_word_embedding = sc.broadcast(word_embedding) + + def convert_word_to_index(s): + def _pad_sequences(sequences, maxlen=None): + new_sequences = [] + + if len(sequences) <= maxlen: + for i in range(maxlen - len(sequences)): + new_sequences.append(np.zeros(self.getEmbeddingSize()).tolist()) + return sequences + new_sequences + else: + return sequences[0:maxlen] + + new_q = [local_word_embedding.value[word] for word in re.split(r"\s+", s) if + word in local_word_embedding.value.keys()] + result = _pad_sequences(new_q, maxlen=self.getSequenceLength()) + return result + + cwti_udf = udf(convert_word_to_index, ArrayType(ArrayType(FloatType()))) + doc_martic = (dataset.withColumn(self.getOutputCol(), cwti_udf(self.getInputCol()).alias(self.getOutputCol())) + .withColumn(self.VOCAB_SIZE, lit(len(word_embedding))) + .withColumn(self.EMBEDDING_SIZE, lit(self.getEmbeddingSize())) + ) + + return doc_martic diff --git a/python/sparkdl/transformers/utils.py b/python/sparkdl/transformers/utils.py index b244365b..9964f3df 100644 --- a/python/sparkdl/transformers/utils.py +++ b/python/sparkdl/transformers/utils.py @@ -18,6 +18,8 @@ # image stuff IMAGE_INPUT_PLACEHOLDER_NAME = "sparkdl_image_input" +TEXT_INPUT_PLACEHOLDER_NAME = "sparkdl_text_input" + def imageInputPlaceholder(nChannels=None): return tf.placeholder(tf.float32, [None, None, None, nChannels], diff --git a/python/tests/Test.py b/python/tests/Test.py new file mode 100644 index 00000000..6327cda4 --- /dev/null +++ b/python/tests/Test.py @@ -0,0 +1,30 @@ +import os +os.environ['PYSPARK_PYTHON'] = '/Users/allwefantasy/python2.7/tensorflow/bin/python' + +from sparkdl import readImages +from pyspark.sql.functions import lit +from pyspark.ml.evaluation import MulticlassClassificationEvaluator +from pyspark.ml.classification import LogisticRegression +from pyspark.ml import Pipeline +from sparkdl import DeepImageFeaturizer + +img_dir="/Users/allwefantasy/resources/images/flower_photos" + +tulips_df = readImages(img_dir + "/tulips").withColumn("label", lit(1)) +daisy_df = readImages(img_dir + "/daisy").withColumn("label", lit(0)) + +tulips_train, tulips_test = tulips_df.randomSplit([0.6, 0.4]) +daisy_train, daisy_test = daisy_df.randomSplit([0.6, 0.4]) +train_df = tulips_train.unionAll(daisy_train) +test_df = tulips_test.unionAll(daisy_test) + +featurizer = DeepImageFeaturizer(inputCol="image", outputCol="features", modelName="InceptionV3") +lr = LogisticRegression(maxIter=20, regParam=0.05, elasticNetParam=0.3, labelCol="label") +p = Pipeline(stages=[featurizer, lr]) + +p_model = p.fit(train_df) +tested_df = p_model.transform(test_df) +evaluator = MulticlassClassificationEvaluator(metricName="accuracy") +print("Test set accuracy = " + str(evaluator.evaluate(tested_df.select("prediction", "label")))) + +# h5py,pil \ No newline at end of file diff --git a/python/tests/Test2.py b/python/tests/Test2.py new file mode 100644 index 00000000..b535a602 --- /dev/null +++ b/python/tests/Test2.py @@ -0,0 +1,22 @@ +import os +from pyspark import SparkContext + +from sparkdl.transformers.tf_text import TFTextTransformer + +os.environ['PYSPARK_PYTHON'] = '/Users/allwefantasy/python2.7/tensorflow/bin/python' + +input_col = "text" +output_col = "preds" + +sc = SparkContext.getOrCreate() +documentDF = sc.createDataFrame([ + ("Hi I heard about Spark".split(" "), 1), + ("I wish Java could use case classes".split(" "), 0), + ("Logistic regression models are neat".split(" "), 2) +], ["text", "preds"]) + +transformer = TFTextTransformer( + inputCol=input_col, outputCol=output_col) + +df = transformer.transform(documentDF) +df.show() \ No newline at end of file diff --git a/python/tests/resources/text/sample.txt b/python/tests/resources/text/sample.txt new file mode 100644 index 00000000..8c5e8d99 --- /dev/null +++ b/python/tests/resources/text/sample.txt @@ -0,0 +1,4 @@ +接下 来 介绍 一种 非常 重要 的 神经网络 卷积神经网络 +这种 神经 网络 在 计算机 视觉 领域 取得了 重大 的 成功,而且 在 自然语言 处理 等 其它 领域 也有 很好 应用 +深度学习 受到 大家 关注 很大 一个 原因 就是 Alex 实现 AlexNet( 一种 深度卷积神经网络 )在 LSVRC-2010 ImageNet +此后 卷积神经网络 及其 变种 被广泛 应用于 各种图像 相关 任务 \ No newline at end of file diff --git a/python/tests/transformers/tf_text_test.py b/python/tests/transformers/tf_text_test.py new file mode 100644 index 00000000..0e8b359d --- /dev/null +++ b/python/tests/transformers/tf_text_test.py @@ -0,0 +1,126 @@ +# Copyright 2017 Databricks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import cPickle as pickle +import shutil +import threading + +from sparkdl.estimators.tf_text_file_estimator import TFTextFileEstimator, KafkaMockServer +from sparkdl.transformers.tf_text import TFTextTransformer +from sparkdl.tf_fun import map_fun +from ..tests import SparkDLTestCase + + +class TFTextTransformerTest(SparkDLTestCase): + def test_convertText(self): + input_col = "text" + output_col = "sentence_matrix" + + documentDF = self.session.createDataFrame([ + ("Hi I heard about Spark", 1), + ("I wish Java could use case classes", 0), + ("Logistic regression models are neat", 2) + ], ["text", "preds"]) + + # transform text column to sentence_matrix column which contains 2-D array. + transformer = TFTextTransformer( + inputCol=input_col, outputCol=output_col, embeddingSize=100, sequenceLength=64) + + df = transformer.transform(documentDF) + data = df.collect() + self.assertEquals(len(data), 3) + for row in data: + self.assertEqual(len(row[output_col]), 64) + self.assertEqual(len(row[output_col][0]), 100) + + +class TFTextFileEstimatorTest(SparkDLTestCase): + def test_trainText(self): + import os + if os.path.exists(KafkaMockServer()._kafka_mock_server_tmp_file_): + shutil.rmtree(KafkaMockServer()._kafka_mock_server_tmp_file_) + + input_col = "text" + output_col = "sentence_matrix" + + documentDF = self.session.createDataFrame([ + ("Hi I heard about Spark", 1), + ("I wish Java could use case classes", 0), + ("Logistic regression models are neat", 2) + ], ["text", "preds"]) + + # transform text column to sentence_matrix column which contains 2-D array. + transformer = TFTextTransformer( + inputCol=input_col, outputCol=output_col, embeddingSize=100, sequenceLength=64) + + df = transformer.transform(documentDF) + + # create a estimator to training where map_fun contains tensorflow's code + estimator = TFTextFileEstimator(inputCol="sentence_matrix", outputCol="sentence_matrix", labelCol="preds", + kafkaParam={"bootstrap_servers": ["127.0.0.1"], "topic": "test", + "group_id": "sdl_1", "test_mode": False}, + fitParam=[{"epochs": 5, "batch_size": 64}, {"epochs": 5, "batch_size": 1}], + mapFnParam=map_fun) + estimator.fit(df).collect() + + +class MockKakfaServerTest(SparkDLTestCase): + def test_mockKafkaServerProduce(self): + dataset = self.session.createDataFrame([ + ("Hi I heard about Spark", 1), + ("I wish Java could use case classes", 0), + ("Logistic regression models are neat", 2) + ], ["text", "preds"]) + + def _write_data(): + def _write_partition(index, d_iter): + producer = KafkaMockServer(index) + try: + for d in d_iter: + producer.send("", pickle.dumps(d)) + producer.send("", pickle.dumps("_stop_")) + producer.flush() + finally: + producer.close() + return [] + + dataset.rdd.mapPartitionsWithIndex(_write_partition).count() + + _write_data() + + def _consume(): + consumer = KafkaMockServer() + stop_count = 0 + while True: + messages = consumer.poll(timeout_ms=1000, max_records=64) + group_msgs = [] + for tp, records in messages.items(): + for record in records: + try: + msg_value = pickle.loads(record.value) + print(msg_value) + if msg_value == "_stop_": + stop_count += 1 + else: + group_msgs.append(msg_value) + except: + pass + if stop_count >= 8: + break + self.assertEquals(stop_count, 8) + + t = threading.Thread(target=_consume) + t.start() + t2 = threading.Thread(target=_consume) + t2.start() From 08e61f34c3329210cb1fc9f4d0365dfe41d00aaa Mon Sep 17 00:00:00 2001 From: WilliamZhu Date: Fri, 13 Oct 2017 17:28:39 +0800 Subject: [PATCH 06/20] set test_mode to True which can avoid to kafka dependency --- python/tests/transformers/tf_text_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/tests/transformers/tf_text_test.py b/python/tests/transformers/tf_text_test.py index 0e8b359d..26f31d1f 100644 --- a/python/tests/transformers/tf_text_test.py +++ b/python/tests/transformers/tf_text_test.py @@ -69,7 +69,7 @@ def test_trainText(self): # create a estimator to training where map_fun contains tensorflow's code estimator = TFTextFileEstimator(inputCol="sentence_matrix", outputCol="sentence_matrix", labelCol="preds", kafkaParam={"bootstrap_servers": ["127.0.0.1"], "topic": "test", - "group_id": "sdl_1", "test_mode": False}, + "group_id": "sdl_1", "test_mode": True}, fitParam=[{"epochs": 5, "batch_size": 64}, {"epochs": 5, "batch_size": 1}], mapFnParam=map_fun) estimator.fit(df).collect() From e51c508b50baaf0458add2f216e48aaeb607ba7a Mon Sep 17 00:00:00 2001 From: WilliamZhu Date: Fri, 13 Oct 2017 18:44:23 +0800 Subject: [PATCH 07/20] clean some file --- python/sparkdl/transformers/named_text.py | 134 ---------------------- python/sparkdl/transformers/tf_image.py | 2 +- python/tests/Test.py | 30 ----- python/tests/Test2.py | 22 ---- 4 files changed, 1 insertion(+), 187 deletions(-) delete mode 100644 python/sparkdl/transformers/named_text.py delete mode 100644 python/tests/Test.py delete mode 100644 python/tests/Test2.py diff --git a/python/sparkdl/transformers/named_text.py b/python/sparkdl/transformers/named_text.py deleted file mode 100644 index ef51cd0c..00000000 --- a/python/sparkdl/transformers/named_text.py +++ /dev/null @@ -1,134 +0,0 @@ -# Copyright 2017 Databricks, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from keras.applications.imagenet_utils import decode_predictions -import numpy as np - -from pyspark.ml import Transformer -from pyspark.ml.param import Param, Params, TypeConverters - -import sparkdl.graph.utils as tfx -from sparkdl.image.imageIO import resizeImage -import sparkdl.transformers.keras_applications as keras_apps -from sparkdl.param import ( - keyword_only, HasInputCol, HasOutputCol, SparkDLTypeConverters) -from sparkdl.transformers.tf_text import TFTextTransformer - -SUPPORTED_MODELS = ["CNN", "LSTM"] - - -class DeepTextFeaturizer(Transformer, HasInputCol, HasOutputCol): - """ - todo - """ - modelName = Param(Params._dummy(), "modelName", "A deep learning model name") - - @keyword_only - def __init__(self, inputCol=None, outputCol=None, modelName=None): - """ - __init__(self, inputCol=None, outputCol=None, modelName=None) - """ - super(DeepTextFeaturizer, self).__init__() - kwargs = self._input_kwargs - self.setParams(**kwargs) - - @keyword_only - def setParams(self, inputCol=None, outputCol=None, modelName=None): - """ - setParams(self, inputCol=None, outputCol=None, modelName=None) - """ - kwargs = self._input_kwargs - self._set(**kwargs) - return self - - def setModelName(self, value): - return self._set(modelName=value) - - def getModelName(self): - return self.getOrDefault(self.modelName) - - def _transform(self, dataset): - transformer = _NamedTextTransformer(inputCol=self.getInputCol(), - outputCol=self.getOutputCol(), - modelName=self.getModelName(), featurize=True) - return transformer.transform(dataset) - - -class _NamedTextTransformer(Transformer, HasInputCol, HasOutputCol): - modelName = Param(Params._dummy(), "modelName", "A deep learning model name", - typeConverter=SparkDLTypeConverters.supportedNameConverter(SUPPORTED_MODELS)) - featurize = Param(Params._dummy(), "featurize", - "If true, output features. If false, output predictions. Either way the output is a vector.", - typeConverter=TypeConverters.toBoolean) - - @keyword_only - def __init__(self, inputCol=None, outputCol=None, modelName=None, featurize=False): - """ - __init__(self, inputCol=None, outputCol=None, modelName=None, featurize=False) - """ - super(_NamedTextTransformer, self).__init__() - kwargs = self._input_kwargs - self.setParams(**kwargs) - self._inputTensorName = None - self._outputTensorName = None - self._outputMode = None - - @keyword_only - def setParams(self, inputCol=None, outputCol=None, modelName=None, featurize=False): - """ - setParams(self, inputCol=None, outputCol=None, modelName=None, featurize=False) - """ - kwargs = self._input_kwargs - self._set(**kwargs) - return self - - def setModelName(self, value): - return self._set(modelName=value) - - def getModelName(self): - return self.getOrDefault(self.modelName) - - def setFeaturize(self, value): - return self._set(featurize=value) - - def getFeaturize(self): - return self.getOrDefault(self.featurize) - - def _transform(self, dataset): - modelGraphSpec = _buildTFGraphForName(self.getModelName(), self.getFeaturize()) - inputCol = self.getInputCol() - resizedCol = "__sdl_textResized" - tfTransformer = TFTextTransformer(inputCol=resizedCol, - outputCol=self.getOutputCol(), - graph=modelGraphSpec["graph"], - inputTensor=modelGraphSpec["inputTensorName"], - outputTensor=modelGraphSpec["outputTensorName"], - outputMode=modelGraphSpec["outputMode"]) - resizeUdf = resizeImage(modelGraphSpec["inputTensorSize"]) - result = tfTransformer.transform(dataset.withColumn(resizedCol, resizeUdf(inputCol))) - return result.drop(resizedCol) - - -def _buildTFGraphForName(name, featurize): - """ - Currently only supports pre-trained models from the Keras applications module. - """ - modelData = keras_apps.getKerasApplicationModel(name).getModelData(featurize) - sess = modelData["session"] - outputTensorName = modelData["outputTensorName"] - graph = tfx.strip_and_freeze_until([outputTensorName], sess.graph, sess, return_graph=True) - modelData["graph"] = graph - - return modelData diff --git a/python/sparkdl/transformers/tf_image.py b/python/sparkdl/transformers/tf_image.py index 2ca33846..da37fcad 100644 --- a/python/sparkdl/transformers/tf_image.py +++ b/python/sparkdl/transformers/tf_image.py @@ -120,7 +120,7 @@ def _transform(self, dataset): with final_graph.as_default(): image = dataset[self.getInputCol()] image_df_exploded = (dataset - .n("__sdl_image_height", image.height) + .withColumn("__sdl_image_height", image.height) .withColumn("__sdl_image_width", image.width) .withColumn("__sdl_image_nchannels", image.nChannels) .withColumn("__sdl_image_data", image.data) diff --git a/python/tests/Test.py b/python/tests/Test.py deleted file mode 100644 index 6327cda4..00000000 --- a/python/tests/Test.py +++ /dev/null @@ -1,30 +0,0 @@ -import os -os.environ['PYSPARK_PYTHON'] = '/Users/allwefantasy/python2.7/tensorflow/bin/python' - -from sparkdl import readImages -from pyspark.sql.functions import lit -from pyspark.ml.evaluation import MulticlassClassificationEvaluator -from pyspark.ml.classification import LogisticRegression -from pyspark.ml import Pipeline -from sparkdl import DeepImageFeaturizer - -img_dir="/Users/allwefantasy/resources/images/flower_photos" - -tulips_df = readImages(img_dir + "/tulips").withColumn("label", lit(1)) -daisy_df = readImages(img_dir + "/daisy").withColumn("label", lit(0)) - -tulips_train, tulips_test = tulips_df.randomSplit([0.6, 0.4]) -daisy_train, daisy_test = daisy_df.randomSplit([0.6, 0.4]) -train_df = tulips_train.unionAll(daisy_train) -test_df = tulips_test.unionAll(daisy_test) - -featurizer = DeepImageFeaturizer(inputCol="image", outputCol="features", modelName="InceptionV3") -lr = LogisticRegression(maxIter=20, regParam=0.05, elasticNetParam=0.3, labelCol="label") -p = Pipeline(stages=[featurizer, lr]) - -p_model = p.fit(train_df) -tested_df = p_model.transform(test_df) -evaluator = MulticlassClassificationEvaluator(metricName="accuracy") -print("Test set accuracy = " + str(evaluator.evaluate(tested_df.select("prediction", "label")))) - -# h5py,pil \ No newline at end of file diff --git a/python/tests/Test2.py b/python/tests/Test2.py deleted file mode 100644 index b535a602..00000000 --- a/python/tests/Test2.py +++ /dev/null @@ -1,22 +0,0 @@ -import os -from pyspark import SparkContext - -from sparkdl.transformers.tf_text import TFTextTransformer - -os.environ['PYSPARK_PYTHON'] = '/Users/allwefantasy/python2.7/tensorflow/bin/python' - -input_col = "text" -output_col = "preds" - -sc = SparkContext.getOrCreate() -documentDF = sc.createDataFrame([ - ("Hi I heard about Spark".split(" "), 1), - ("I wish Java could use case classes".split(" "), 0), - ("Logistic regression models are neat".split(" "), 2) -], ["text", "preds"]) - -transformer = TFTextTransformer( - inputCol=input_col, outputCol=output_col) - -df = transformer.transform(documentDF) -df.show() \ No newline at end of file From 65a469497037749ca813c68dc389b3e9168b640c Mon Sep 17 00:00:00 2001 From: Philip Yang Date: Sat, 14 Oct 2017 09:12:50 -0700 Subject: [PATCH 08/20] [#55] fix TFImageTransformer example in docs (#58) --- README.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 5cd68ba7..fa4a16d9 100644 --- a/README.md +++ b/README.md @@ -131,16 +131,16 @@ Spark DataFrames are a natural construct for applying deep learning models to a ```python from sparkdl import readImages, TFImageTransformer + import sparkdl.graph.utils as tfx from sparkdl.transformers import utils import tensorflow as tf - g = tf.Graph() - with g.as_default(): + graph = tf.Graph() + with tf.Session(graph=graph) as sess: image_arr = utils.imageInputPlaceholder() resized_images = tf.image.resize_images(image_arr, (299, 299)) - # the following step is not necessary for this graph, but can be for graphs with variables, etc - frozen_graph = utils.stripAndFreezeGraph(g.as_graph_def(add_shapes=True), tf.Session(graph=g), - [resized_images]) + frozen_graph = tfx.strip_and_freeze_until([resized_images], graph, sess, + return_graph=True) transformer = TFImageTransformer(inputCol="image", outputCol="predictions", graph=frozen_graph, inputTensor=image_arr, outputTensor=resized_images, @@ -241,7 +241,7 @@ registerKerasImageUDF("my_keras_inception_udf", InceptionV3(weights="imagenet"), ``` +### Estimator ## Releases: * 0.1.0 initial release - From b812764cdef23d7523490babf47a6ab970386186 Mon Sep 17 00:00:00 2001 From: WilliamZhu Date: Wed, 18 Oct 2017 10:25:48 +0800 Subject: [PATCH 09/20] move tensorflow map_fun to tf_text_test.py and modify the signature to support integrating TFoS infuture --- python/requirements.txt | 1 + .../estimators/tf_text_file_estimator.py | 11 +-- python/sparkdl/tf_fun.py | 90 ------------------ python/tests/transformers/tf_text_test.py | 93 ++++++++++++++++++- 4 files changed, 97 insertions(+), 98 deletions(-) delete mode 100644 python/sparkdl/tf_fun.py diff --git a/python/requirements.txt b/python/requirements.txt index a98a4d17..39981df5 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -9,3 +9,4 @@ pygments>=2.2.0 tensorflow==1.3.0 pandas>=0.19.1 six>=1.10.0 +kafka-python>=1.3.5 diff --git a/python/sparkdl/estimators/tf_text_file_estimator.py b/python/sparkdl/estimators/tf_text_file_estimator.py index 278ab8e5..1f2fb116 100644 --- a/python/sparkdl/estimators/tf_text_file_estimator.py +++ b/python/sparkdl/estimators/tf_text_file_estimator.py @@ -245,12 +245,11 @@ def _read_data(max_records=64): finally: consumer.close() - self.getMapFnParam()(_read_data, - feature=inputCol, - label=labelCol, - vacab_size=vocab_s, - embedding_size=embedding_size, - params=params + self.getMapFnParam()(args={"feature": inputCol, + "label": labelCol, + "vacab_size": vocab_s, + "embedding_size": embedding_size, + "params": params}, ctx=None, _read_data=_read_data, ) return paramMapsRDD.map(lambda paramMap: (paramMap, _local_fit(paramMap))) diff --git a/python/sparkdl/tf_fun.py b/python/sparkdl/tf_fun.py deleted file mode 100644 index b870f5f8..00000000 --- a/python/sparkdl/tf_fun.py +++ /dev/null @@ -1,90 +0,0 @@ -def map_fun(_read_data, **args): - import tensorflow as tf - EMBEDDING_SIZE = args["embedding_size"] - feature = args['feature'] - label = args['label'] - params = args['params']['fitParam'] - SEQUENCE_LENGTH = 64 - - def feed_dict(batch): - # Convert from dict of named arrays to two numpy arrays of the proper type - features = [] - for i in batch: - features.append(i['sentence_matrix']) - - # print("{} {}".format(feature, features)) - return features - - encoder_variables_dict = { - "encoder_w1": tf.Variable( - tf.random_normal([SEQUENCE_LENGTH * EMBEDDING_SIZE, 256]), name="encoder_w1"), - "encoder_b1": tf.Variable(tf.random_normal([256]), name="encoder_b1"), - "encoder_w2": tf.Variable(tf.random_normal([256, 128]), name="encoder_w2"), - "encoder_b2": tf.Variable(tf.random_normal([128]), name="encoder_b2") - } - - def encoder(x, name="encoder"): - with tf.name_scope(name): - encoder_w1 = encoder_variables_dict["encoder_w1"] - encoder_b1 = encoder_variables_dict["encoder_b1"] - - layer_1 = tf.nn.sigmoid(tf.matmul(x, encoder_w1) + encoder_b1) - - encoder_w2 = encoder_variables_dict["encoder_w2"] - encoder_b2 = encoder_variables_dict["encoder_b2"] - - layer_2 = tf.nn.sigmoid(tf.matmul(layer_1, encoder_w2) + encoder_b2) - return layer_2 - - def decoder(x, name="decoder"): - with tf.name_scope(name): - decoder_w1 = tf.Variable(tf.random_normal([128, 256])) - decoder_b1 = tf.Variable(tf.random_normal([256])) - - layer_1 = tf.nn.sigmoid(tf.matmul(x, decoder_w1) + decoder_b1) - - decoder_w2 = tf.Variable( - tf.random_normal([256, SEQUENCE_LENGTH * EMBEDDING_SIZE])) - decoder_b2 = tf.Variable( - tf.random_normal([SEQUENCE_LENGTH * EMBEDDING_SIZE])) - - layer_2 = tf.nn.sigmoid(tf.matmul(layer_1, decoder_w2) + decoder_b2) - return layer_2 - - tf.reset_default_graph - sess = tf.Session() - - input_x = tf.placeholder(tf.float32, [None, SEQUENCE_LENGTH, EMBEDDING_SIZE], name="input_x") - flattened = tf.reshape(input_x, - [-1, SEQUENCE_LENGTH * EMBEDDING_SIZE]) - - encoder_op = encoder(flattened) - - tf.add_to_collection('encoder_op', encoder_op) - - y_pred = decoder(encoder_op) - - y_true = flattened - - with tf.name_scope("xent"): - consine = tf.div(tf.reduce_sum(tf.multiply(y_pred, y_true), 1), - tf.multiply(tf.sqrt(tf.reduce_sum(tf.multiply(y_pred, y_pred), 1)), - tf.sqrt(tf.reduce_sum(tf.multiply(y_true, y_true), 1)))) - xent = tf.reduce_sum(tf.subtract(tf.constant(1.0), consine)) - tf.summary.scalar("xent", xent) - - with tf.name_scope("train"): - # train_step = tf.train.GradientDescentOptimizer(learning_rate).minimize(xent) - train_step = tf.train.RMSPropOptimizer(0.01).minimize(xent) - - summ = tf.summary.merge_all() - - sess.run(tf.global_variables_initializer()) - - for i in range(params["epochs"]): - print("epoll {}".format(i)) - for data in _read_data(max_records=params["batch_size"]): - batch_data = feed_dict(data) - sess.run(train_step, feed_dict={input_x: batch_data}) - - sess.close() diff --git a/python/tests/transformers/tf_text_test.py b/python/tests/transformers/tf_text_test.py index 26f31d1f..25cb1e0d 100644 --- a/python/tests/transformers/tf_text_test.py +++ b/python/tests/transformers/tf_text_test.py @@ -18,10 +18,99 @@ from sparkdl.estimators.tf_text_file_estimator import TFTextFileEstimator, KafkaMockServer from sparkdl.transformers.tf_text import TFTextTransformer -from sparkdl.tf_fun import map_fun from ..tests import SparkDLTestCase +def map_fun(args={}, ctx=None, _read_data=None): + import tensorflow as tf + EMBEDDING_SIZE = args["embedding_size"] + params = args['params']['fitParam'] + SEQUENCE_LENGTH = 64 + + def feed_dict(batch): + # Convert from dict of named arrays to two numpy arrays of the proper type + features = [] + for i in batch: + features.append(i['sentence_matrix']) + + # print("{} {}".format(feature, features)) + return features + + encoder_variables_dict = { + "encoder_w1": tf.Variable( + tf.random_normal([SEQUENCE_LENGTH * EMBEDDING_SIZE, 256]), name="encoder_w1"), + "encoder_b1": tf.Variable(tf.random_normal([256]), name="encoder_b1"), + "encoder_w2": tf.Variable(tf.random_normal([256, 128]), name="encoder_w2"), + "encoder_b2": tf.Variable(tf.random_normal([128]), name="encoder_b2") + } + + def encoder(x, name="encoder"): + with tf.name_scope(name): + encoder_w1 = encoder_variables_dict["encoder_w1"] + encoder_b1 = encoder_variables_dict["encoder_b1"] + + layer_1 = tf.nn.sigmoid(tf.matmul(x, encoder_w1) + encoder_b1) + + encoder_w2 = encoder_variables_dict["encoder_w2"] + encoder_b2 = encoder_variables_dict["encoder_b2"] + + layer_2 = tf.nn.sigmoid(tf.matmul(layer_1, encoder_w2) + encoder_b2) + return layer_2 + + def decoder(x, name="decoder"): + with tf.name_scope(name): + decoder_w1 = tf.Variable(tf.random_normal([128, 256])) + decoder_b1 = tf.Variable(tf.random_normal([256])) + + layer_1 = tf.nn.sigmoid(tf.matmul(x, decoder_w1) + decoder_b1) + + decoder_w2 = tf.Variable( + tf.random_normal([256, SEQUENCE_LENGTH * EMBEDDING_SIZE])) + decoder_b2 = tf.Variable( + tf.random_normal([SEQUENCE_LENGTH * EMBEDDING_SIZE])) + + layer_2 = tf.nn.sigmoid(tf.matmul(layer_1, decoder_w2) + decoder_b2) + return layer_2 + + tf.reset_default_graph + sess = tf.Session() + + input_x = tf.placeholder(tf.float32, [None, SEQUENCE_LENGTH, EMBEDDING_SIZE], name="input_x") + flattened = tf.reshape(input_x, + [-1, SEQUENCE_LENGTH * EMBEDDING_SIZE]) + + encoder_op = encoder(flattened) + + tf.add_to_collection('encoder_op', encoder_op) + + y_pred = decoder(encoder_op) + + y_true = flattened + + with tf.name_scope("xent"): + consine = tf.div(tf.reduce_sum(tf.multiply(y_pred, y_true), 1), + tf.multiply(tf.sqrt(tf.reduce_sum(tf.multiply(y_pred, y_pred), 1)), + tf.sqrt(tf.reduce_sum(tf.multiply(y_true, y_true), 1)))) + xent = tf.reduce_sum(tf.subtract(tf.constant(1.0), consine)) + tf.summary.scalar("xent", xent) + + with tf.name_scope("train"): + # train_step = tf.train.GradientDescentOptimizer(learning_rate).minimize(xent) + train_step = tf.train.RMSPropOptimizer(0.01).minimize(xent) + + summ = tf.summary.merge_all() + + sess.run(tf.global_variables_initializer()) + + for i in range(params["epochs"]): + print("epoll {}".format(i)) + for data in _read_data(max_records=params["batch_size"]): + batch_data = feed_dict(data) + sess.run(train_step, feed_dict={input_x: batch_data}) + + sess.close() + + class TFTextTransformerTest(SparkDLTestCase): def test_convertText(self): input_col = "text" @@ -48,7 +137,7 @@ def test_convertText(self): class TFTextFileEstimatorTest(SparkDLTestCase): def test_trainText(self): import os - if os.path.exists(KafkaMockServer()._kafka_mock_server_tmp_file_): + if os.path.exists(KafkaMockServer()._kafka_mock_server_tmp_file_): shutil.rmtree(KafkaMockServer()._kafka_mock_server_tmp_file_) input_col = "text" From e277b24b0889005da2b3387edb11c569e0118aeb Mon Sep 17 00:00:00 2001 From: WilliamZhu Date: Wed, 18 Oct 2017 11:07:40 +0800 Subject: [PATCH 10/20] fix code style in TFTextTransformer --- python/sparkdl/transformers/tf_text.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/python/sparkdl/transformers/tf_text.py b/python/sparkdl/transformers/tf_text.py index b040adc0..c224ec34 100644 --- a/python/sparkdl/transformers/tf_text.py +++ b/python/sparkdl/transformers/tf_text.py @@ -58,10 +58,12 @@ def setParams(self, inputCol=None, outputCol=None, embeddingSize=100, sequenceLe def _transform(self, dataset): word2vec = Word2Vec(vectorSize=self.getEmbeddingSize(), minCount=1, inputCol=self.getInputCol(), outputCol="word_embedding") - word_embedding = dict( - word2vec.fit( - dataset.select(f.split(self.getInputCol(), "\\s+").alias(self.getInputCol()))).getVectors().rdd.map( - lambda p: (p.word, p.vector.values.tolist())).collect()) + word2vecModel = word2vec.fit( + dataset.select(f.split(self.getInputCol(), "\\s+").alias(self.getInputCol()))) + + word_embedding = dict(word2vecModel.getVectors().rdd.map( + lambda p: (p.word, p.vector.values.tolist())).collect()) + word_embedding["unk"] = np.zeros(self.getEmbeddingSize()).tolist() sc = JVMAPI._curr_sc() local_word_embedding = sc.broadcast(word_embedding) From edd359c2911dd9aa45de75ca0b297dccdd68c265 Mon Sep 17 00:00:00 2001 From: WilliamZhu Date: Wed, 18 Oct 2017 15:03:23 +0800 Subject: [PATCH 11/20] make sure TFTextTransformer will pass the ./python/run-tests.sh --- .gitignore | 1 + python/sparkdl/transformers/tf_text.py | 21 +++++++++++++++++---- python/tests/transformers/tf_text_test.py | 3 ++- 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/.gitignore b/.gitignore index 26ae9a84..7f594401 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,7 @@ README.org .cache/ .history/ .lib/ +.coverage dist/* target/ lib_managed/ diff --git a/python/sparkdl/transformers/tf_text.py b/python/sparkdl/transformers/tf_text.py index c224ec34..dc4fc40d 100644 --- a/python/sparkdl/transformers/tf_text.py +++ b/python/sparkdl/transformers/tf_text.py @@ -56,16 +56,29 @@ def setParams(self, inputCol=None, outputCol=None, embeddingSize=100, sequenceLe return self._set(**kwargs) def _transform(self, dataset): + + sc = JVMAPI._curr_sc() + word2vec = Word2Vec(vectorSize=self.getEmbeddingSize(), minCount=1, inputCol=self.getInputCol(), outputCol="word_embedding") - word2vecModel = word2vec.fit( - dataset.select(f.split(self.getInputCol(), "\\s+").alias(self.getInputCol()))) - word_embedding = dict(word2vecModel.getVectors().rdd.map( + vectorsDf = word2vec.fit( + dataset.select(f.split(self.getInputCol(), "\\s+").alias(self.getInputCol()))).getVectors() + + """ + It's strange here that after calling getVectors the df._sc._jsc will lose and this is + only happens when you run it with ./python/run-tests.sh script. + We add this code to make it pass the test. However it seems this will hit + "org.apache.spark.SparkException: EOF reached before Python server acknowledged" error. + """ + if vectorsDf._sc._jsc is None: + vectorsDf._sc._jsc = sc._jsc + + word_embedding = dict(vectorsDf.rdd.map( lambda p: (p.word, p.vector.values.tolist())).collect()) word_embedding["unk"] = np.zeros(self.getEmbeddingSize()).tolist() - sc = JVMAPI._curr_sc() + local_word_embedding = sc.broadcast(word_embedding) def convert_word_to_index(s): diff --git a/python/tests/transformers/tf_text_test.py b/python/tests/transformers/tf_text_test.py index 25cb1e0d..62e44f15 100644 --- a/python/tests/transformers/tf_text_test.py +++ b/python/tests/transformers/tf_text_test.py @@ -16,6 +16,8 @@ import shutil import threading +from pyspark import SparkContext, SQLContext + from sparkdl.estimators.tf_text_file_estimator import TFTextFileEstimator, KafkaMockServer from sparkdl.transformers.tf_text import TFTextTransformer from ..tests import SparkDLTestCase @@ -115,7 +117,6 @@ class TFTextTransformerTest(SparkDLTestCase): def test_convertText(self): input_col = "text" output_col = "sentence_matrix" - documentDF = self.session.createDataFrame([ ("Hi I heard about Spark", 1), ("I wish Java could use case classes", 0), From b2550c38aca9c5020689a9ade2f22ad508290467 Mon Sep 17 00:00:00 2001 From: WilliamZhu Date: Wed, 18 Oct 2017 15:08:02 +0800 Subject: [PATCH 12/20] fix conflict --- python/tests/transformers/tf_text_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/tests/transformers/tf_text_test.py b/python/tests/transformers/tf_text_test.py index 84829c82..25cb1e0d 100644 --- a/python/tests/transformers/tf_text_test.py +++ b/python/tests/transformers/tf_text_test.py @@ -115,7 +115,7 @@ class TFTextTransformerTest(SparkDLTestCase): def test_convertText(self): input_col = "text" output_col = "sentence_matrix" - + documentDF = self.session.createDataFrame([ ("Hi I heard about Spark", 1), ("I wish Java could use case classes", 0), From ddc1b7bc219a6929cbecb2dd50aed0fa97957b50 Mon Sep 17 00:00:00 2001 From: WilliamZhu Date: Wed, 18 Oct 2017 16:07:01 +0800 Subject: [PATCH 13/20] fix pickle in python 3 --- python/sparkdl/estimators/tf_text_file_estimator.py | 9 ++++++--- python/tests/transformers/tf_text_test.py | 5 ++++- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/python/sparkdl/estimators/tf_text_file_estimator.py b/python/sparkdl/estimators/tf_text_file_estimator.py index 1f2fb116..b76f47c5 100644 --- a/python/sparkdl/estimators/tf_text_file_estimator.py +++ b/python/sparkdl/estimators/tf_text_file_estimator.py @@ -21,9 +21,7 @@ import threading import time import os -import shutil - -import cPickle as pickle +import sys from kafka import KafkaConsumer from kafka import KafkaProducer @@ -34,6 +32,11 @@ from sparkdl.param.shared_params import KafkaParam, FitParam, MapFnParam import sparkdl.utils.jvmapi as JVMAPI +if sys.version_info[:2] <= (2, 7): + import cPickle as pickle +else: + import _pickle as pickle + __all__ = ['TFTextFileEstimator'] logger = logging.getLogger('sparkdl') diff --git a/python/tests/transformers/tf_text_test.py b/python/tests/transformers/tf_text_test.py index 25cb1e0d..c523f440 100644 --- a/python/tests/transformers/tf_text_test.py +++ b/python/tests/transformers/tf_text_test.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import cPickle as pickle import shutil import threading @@ -20,6 +19,10 @@ from sparkdl.transformers.tf_text import TFTextTransformer from ..tests import SparkDLTestCase +if sys.version_info[:2] <= (2, 7): + import cPickle as pickle +else: + import _pickle as pickle def map_fun(args={}, ctx=None, _read_data=None): import tensorflow as tf From eeb462b3a4f8b3513b2fa466f709ba7ac2b71c47 Mon Sep 17 00:00:00 2001 From: WilliamZhu Date: Wed, 18 Oct 2017 17:20:11 +0800 Subject: [PATCH 14/20] import sys --- python/tests/transformers/tf_text_test.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/tests/transformers/tf_text_test.py b/python/tests/transformers/tf_text_test.py index c523f440..080a3c1f 100644 --- a/python/tests/transformers/tf_text_test.py +++ b/python/tests/transformers/tf_text_test.py @@ -14,7 +14,7 @@ # import shutil import threading - +import sys from sparkdl.estimators.tf_text_file_estimator import TFTextFileEstimator, KafkaMockServer from sparkdl.transformers.tf_text import TFTextTransformer from ..tests import SparkDLTestCase @@ -24,6 +24,7 @@ else: import _pickle as pickle + def map_fun(args={}, ctx=None, _read_data=None): import tensorflow as tf EMBEDDING_SIZE = args["embedding_size"] From d32381df25d612a06e14464e14bddabd590c0145 Mon Sep 17 00:00:00 2001 From: WilliamZhu Date: Wed, 18 Oct 2017 17:23:20 +0800 Subject: [PATCH 15/20] rm /tmp/mock_kafka before run test --- python/tests/transformers/tf_text_test.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/tests/transformers/tf_text_test.py b/python/tests/transformers/tf_text_test.py index 080a3c1f..95ff6d06 100644 --- a/python/tests/transformers/tf_text_test.py +++ b/python/tests/transformers/tf_text_test.py @@ -170,6 +170,10 @@ def test_trainText(self): class MockKakfaServerTest(SparkDLTestCase): def test_mockKafkaServerProduce(self): + import os + if os.path.exists(KafkaMockServer()._kafka_mock_server_tmp_file_): + shutil.rmtree(KafkaMockServer()._kafka_mock_server_tmp_file_) + dataset = self.session.createDataFrame([ ("Hi I heard about Spark", 1), ("I wish Java could use case classes", 0), From 99ab371d4167cf3723d1a60fc5269e685d04ed24 Mon Sep 17 00:00:00 2001 From: WilliamZhu Date: Wed, 18 Oct 2017 18:10:40 +0800 Subject: [PATCH 16/20] kafka temp directory using tempfile.mkdtemp --- python/sparkdl/estimators/tf_text_file_estimator.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/sparkdl/estimators/tf_text_file_estimator.py b/python/sparkdl/estimators/tf_text_file_estimator.py index b76f47c5..6bb9dff9 100644 --- a/python/sparkdl/estimators/tf_text_file_estimator.py +++ b/python/sparkdl/estimators/tf_text_file_estimator.py @@ -269,7 +269,8 @@ class KafkaMockServer(object): * Make sure all data have been writen before consume. * Poll function will just ignore max_records and just return all data in queue. """ - _kafka_mock_server_tmp_file_ = "/tmp/mock-kafka/" + import tempfile + _kafka_mock_server_tmp_file_ = tempfile.mkdtemp() sended = False def __init__(self, index=0): From 67f5f30a3e23ed9d30f62b289c1f7713b5b555c4 Mon Sep 17 00:00:00 2001 From: WilliamZhu Date: Wed, 18 Oct 2017 19:39:02 +0800 Subject: [PATCH 17/20] fix kafka tmp file --- .../estimators/tf_text_file_estimator.py | 26 ++++++++++--------- python/tests/transformers/tf_text_test.py | 22 ++++++++-------- 2 files changed, 25 insertions(+), 23 deletions(-) diff --git a/python/sparkdl/estimators/tf_text_file_estimator.py b/python/sparkdl/estimators/tf_text_file_estimator.py index 6bb9dff9..c131ca47 100644 --- a/python/sparkdl/estimators/tf_text_file_estimator.py +++ b/python/sparkdl/estimators/tf_text_file_estimator.py @@ -163,10 +163,11 @@ def _fitInParallel(self, dataset, paramMaps): group_id = kafaParams["group_id"] bootstrap_servers = kafaParams["bootstrap_servers"] kafka_test_mode = kafaParams["test_mode"] if "test_mode" in kafaParams else False + mock_kafka_file = kafaParams["mock_kafka_file"] if kafka_test_mode else None def _write_data(): def _write_partition(index, d_iter): - producer = KafkaMockServer(index) if kafka_test_mode else KafkaProducer( + producer = KafkaMockServer(index, mock_kafka_file) if kafka_test_mode else KafkaProducer( bootstrap_servers=bootstrap_servers) try: for d in d_iter: @@ -205,12 +206,12 @@ def _local_fit(override_param_map): params["fitParam"] = override_param_map def _read_data(max_records=64): - consumer = KafkaMockServer() if kafka_test_mode else KafkaConsumer(topic, - group_id=group_id, - bootstrap_servers=bootstrap_servers, - auto_offset_reset="earliest", - enable_auto_commit=False - ) + consumer = KafkaMockServer(0, mock_kafka_file) if kafka_test_mode else KafkaConsumer(topic, + group_id=group_id, + bootstrap_servers=bootstrap_servers, + auto_offset_reset="earliest", + enable_auto_commit=False + ) try: stop_count = 0 fail_msg_count = 0 @@ -269,14 +270,15 @@ class KafkaMockServer(object): * Make sure all data have been writen before consume. * Poll function will just ignore max_records and just return all data in queue. """ - import tempfile - _kafka_mock_server_tmp_file_ = tempfile.mkdtemp() + + _kafka_mock_server_tmp_file_ = None sended = False - def __init__(self, index=0): + def __init__(self, index=0, tmp_file=None): super(KafkaMockServer, self).__init__() self.index = index self.queue = [] + self._kafka_mock_server_tmp_file_ = tmp_file if not os.path.exists(self._kafka_mock_server_tmp_file_): os.mkdir(self._kafka_mock_server_tmp_file_) @@ -284,7 +286,7 @@ def send(self, topic, msg): self.queue.append(pickle.loads(msg)) def flush(self): - with open(self._kafka_mock_server_tmp_file_ + str(self.index), "w") as f: + with open(self._kafka_mock_server_tmp_file_ + "/" + str(self.index), "w") as f: pickle.dump(self.queue, f) self.queue = [] @@ -297,7 +299,7 @@ def poll(self, timeout_ms, max_records): records = [] for file in os.listdir(self._kafka_mock_server_tmp_file_): - with open(self._kafka_mock_server_tmp_file_ + file) as f: + with open(self._kafka_mock_server_tmp_file_ + "/" + file) as f: tmp = pickle.load(f) records += tmp result = {} diff --git a/python/tests/transformers/tf_text_test.py b/python/tests/transformers/tf_text_test.py index 95ff6d06..b058295b 100644 --- a/python/tests/transformers/tf_text_test.py +++ b/python/tests/transformers/tf_text_test.py @@ -140,10 +140,6 @@ def test_convertText(self): class TFTextFileEstimatorTest(SparkDLTestCase): def test_trainText(self): - import os - if os.path.exists(KafkaMockServer()._kafka_mock_server_tmp_file_): - shutil.rmtree(KafkaMockServer()._kafka_mock_server_tmp_file_) - input_col = "text" output_col = "sentence_matrix" @@ -158,22 +154,23 @@ def test_trainText(self): inputCol=input_col, outputCol=output_col, embeddingSize=100, sequenceLength=64) df = transformer.transform(documentDF) - + import tempfile + mock_kafka_file = tempfile.mkdtemp() # create a estimator to training where map_fun contains tensorflow's code estimator = TFTextFileEstimator(inputCol="sentence_matrix", outputCol="sentence_matrix", labelCol="preds", kafkaParam={"bootstrap_servers": ["127.0.0.1"], "topic": "test", + "mock_kafka_file": mock_kafka_file, "group_id": "sdl_1", "test_mode": True}, fitParam=[{"epochs": 5, "batch_size": 64}, {"epochs": 5, "batch_size": 1}], mapFnParam=map_fun) estimator.fit(df).collect() + shutil.rmtree(mock_kafka_file) class MockKakfaServerTest(SparkDLTestCase): def test_mockKafkaServerProduce(self): - import os - if os.path.exists(KafkaMockServer()._kafka_mock_server_tmp_file_): - shutil.rmtree(KafkaMockServer()._kafka_mock_server_tmp_file_) - + import tempfile + mock_kafka_file = tempfile.mkdtemp() dataset = self.session.createDataFrame([ ("Hi I heard about Spark", 1), ("I wish Java could use case classes", 0), @@ -182,7 +179,7 @@ def test_mockKafkaServerProduce(self): def _write_data(): def _write_partition(index, d_iter): - producer = KafkaMockServer(index) + producer = KafkaMockServer(index, mock_kafka_file) try: for d in d_iter: producer.send("", pickle.dumps(d)) @@ -197,7 +194,7 @@ def _write_partition(index, d_iter): _write_data() def _consume(): - consumer = KafkaMockServer() + consumer = KafkaMockServer(0, mock_kafka_file) stop_count = 0 while True: messages = consumer.poll(timeout_ms=1000, max_records=64) @@ -221,3 +218,6 @@ def _consume(): t.start() t2 = threading.Thread(target=_consume) t2.start() + import time + time.sleep(10) + shutil.rmtree(mock_kafka_file) From d8543792e58e12bfb8cba7ee409bcf9f25682464 Mon Sep 17 00:00:00 2001 From: WilliamZhu Date: Wed, 18 Oct 2017 20:06:35 +0800 Subject: [PATCH 18/20] remove cpickle when using python 3 --- python/sparkdl/estimators/tf_text_file_estimator.py | 2 +- python/tests/transformers/tf_text_test.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/sparkdl/estimators/tf_text_file_estimator.py b/python/sparkdl/estimators/tf_text_file_estimator.py index c131ca47..48e92569 100644 --- a/python/sparkdl/estimators/tf_text_file_estimator.py +++ b/python/sparkdl/estimators/tf_text_file_estimator.py @@ -35,7 +35,7 @@ if sys.version_info[:2] <= (2, 7): import cPickle as pickle else: - import _pickle as pickle + import pickle __all__ = ['TFTextFileEstimator'] diff --git a/python/tests/transformers/tf_text_test.py b/python/tests/transformers/tf_text_test.py index b058295b..99c505be 100644 --- a/python/tests/transformers/tf_text_test.py +++ b/python/tests/transformers/tf_text_test.py @@ -22,7 +22,7 @@ if sys.version_info[:2] <= (2, 7): import cPickle as pickle else: - import _pickle as pickle + import pickle def map_fun(args={}, ctx=None, _read_data=None): From b0aac3625d3073cbb8d5bb40501095a7d8eb4637 Mon Sep 17 00:00:00 2001 From: WilliamZhu Date: Wed, 18 Oct 2017 20:33:42 +0800 Subject: [PATCH 19/20] pickle write file with wb mode --- python/sparkdl/estimators/tf_text_file_estimator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/sparkdl/estimators/tf_text_file_estimator.py b/python/sparkdl/estimators/tf_text_file_estimator.py index 48e92569..48750ec0 100644 --- a/python/sparkdl/estimators/tf_text_file_estimator.py +++ b/python/sparkdl/estimators/tf_text_file_estimator.py @@ -286,7 +286,7 @@ def send(self, topic, msg): self.queue.append(pickle.loads(msg)) def flush(self): - with open(self._kafka_mock_server_tmp_file_ + "/" + str(self.index), "w") as f: + with open(self._kafka_mock_server_tmp_file_ + "/" + str(self.index), "wb") as f: pickle.dump(self.queue, f) self.queue = [] From 99d2b30a1f42c4b19837282584d3afd988e0790b Mon Sep 17 00:00:00 2001 From: WilliamZhu Date: Wed, 18 Oct 2017 21:19:14 +0800 Subject: [PATCH 20/20] read pickle file with rb --- python/sparkdl/estimators/tf_text_file_estimator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/sparkdl/estimators/tf_text_file_estimator.py b/python/sparkdl/estimators/tf_text_file_estimator.py index 48750ec0..3f35e0be 100644 --- a/python/sparkdl/estimators/tf_text_file_estimator.py +++ b/python/sparkdl/estimators/tf_text_file_estimator.py @@ -299,7 +299,7 @@ def poll(self, timeout_ms, max_records): records = [] for file in os.listdir(self._kafka_mock_server_tmp_file_): - with open(self._kafka_mock_server_tmp_file_ + "/" + file) as f: + with open(self._kafka_mock_server_tmp_file_ + "/" + file, "rb") as f: tmp = pickle.load(f) records += tmp result = {}