Commit 176dcde6 authored by Federico Mestrone's avatar Federico Mestrone
Browse files

Improved usability of properties in Dataflow

parent 86259ddf
#!/usr/bin/env bash
mvn compile exec:java -e \
-Dexec.mainClass=gcp.cm.bigdata.adtech.dataflow.CleaningPipeline \
-Dexec.args="gs://bucket-for-codemotion/adtech/test \
gs://bucket-for-codemotion/adtech/test-df-out.csv \
--project=qwiklabs-gcp-1b28c6712b534a2b \
--stagingLocation=gs://bucket-for-codemotion/dataflow/staging/ \
--tempLocation=gs://bucket-for-codemotion/dataflow/temp/ \
--runner=DataflowRunner"
-Dexec.mainClass=gcp.cm.bigdata.adtech.dataflow.BatchPipeline \
-Dexec.args=""
......@@ -14,18 +14,19 @@ import java.util.Arrays;
import java.util.List;
import java.util.Properties;
public class CleaningPipeline {
public class BatchPipeline {
public static void main(String[] args) throws IOException {
Properties props = new Properties();
props.load(StreamingPipeline.class.getResourceAsStream("/gcp.properties"));
String project = props.getProperty("gcp.project-id");
String bucketSrc = props.getProperty("streaming.read.file");
String bucketDest = props.getProperty("streaming.write.folder");
String bucket = props.getProperty("gcp.bucket");
String bucketSrc = String.format("gs://%s/%s", bucket, props.getProperty("streaming.read.file"));
String bucketDest = String.format("gs://%s/%s", bucket, props.getProperty("streaming.write.folder"));
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setProject(project);
options.setStagingLocation(props.getProperty("gcp.dataflow.staging"));
options.setTempLocation(props.getProperty("gcp.dataflow.temp"));
options.setStagingLocation(String.format("gs://%s/%s", bucket, props.getProperty("gcp.dataflow.staging")));
options.setTempLocation(String.format("gs://%s/%s", bucket, props.getProperty("gcp.dataflow.temp")));
options.setRunner(DataflowRunner.class);
options.setStreaming(false);
......@@ -53,6 +54,7 @@ public class CleaningPipeline {
TextIO.write().withoutSharding().to(bucketDest + "output-df.csv")
)
;
p.run().waitUntilFinish();
}
}
......@@ -4,7 +4,7 @@ import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.io.DefaultFilenamePolicy;
import org.apache.beam.sdk.io.*;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
......@@ -32,12 +32,13 @@ public class StreamingPipeline {
Properties props = new Properties();
props.load(StreamingPipeline.class.getResourceAsStream("/gcp.properties"));
String project = props.getProperty("gcp.project-id");
String bucketDest = props.getProperty("streaming.write.folder");
String bucket = props.getProperty("gcp.bucket");
String bucketDest = String.format("gs://%s/%s", bucket, props.getProperty("streaming.write.folder"));
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setProject(project);
options.setStagingLocation(props.getProperty("gcp.dataflow.staging"));
options.setTempLocation(props.getProperty("gcp.dataflow.temp"));
options.setStagingLocation(String.format("gs://%s/%s", bucket, props.getProperty("gcp.dataflow.staging")));
options.setTempLocation(String.format("gs://%s/%s", bucket, props.getProperty("gcp.dataflow.temp")));
options.setRunner(DataflowRunner.class);
options.setStreaming(true);
......@@ -46,7 +47,9 @@ public class StreamingPipeline {
cr.registerCoderForClass(JSONObject.class, new JSONObjectCoder());
PCollection<JSONObject> coll = p
.apply("ReadFromPubsub",
PubsubIO.readMessagesWithAttributes().fromTopic("projects/" + project + "/topics/" + props.getProperty("gcp.pubsub.topic")))
PubsubIO.readMessagesWithAttributes().fromTopic("projects/" + project + "/topics/" + props.getProperty("gcp.pubsub.topic"))
// PubsubIO.readMessagesWithAttributes().fromSubscription("projects/" + project + "/subscriptions/" + props.getProperty("gcp.pubsub.subscription"))
)
.apply("ToJSON", MapElements
.into(TypeDescriptor.of(JSONObject.class))
.via((PubsubMessage msg) -> new JSONObject(new String(msg.getPayload())))
......
gcp.project-id=qwiklabs-gcp-56c3e61809c73e4d
gcp.bucket=bucket-for-codemotion
gcp.dataflow.staging=gs://bucket-for-codemotion/dataflow/staging/
gcp.dataflow.temp=gs://bucket-for-codemotion/dataflow/temp/
gcp.dataflow.staging=dataflow/staging/
gcp.dataflow.temp=dataflow/temp/
gcp.pubsub.topic=ad-impressions
streaming.write.folder=gs://bucket-for-codemotion/dataflow/output/
streaming.read.file=gs://bucket-for-codemotion/dataflow/train.csv
gcp.pubsub.subscription=ad-impressions
streaming.write.folder=adtech/output/
streaming.read.file=adtech/train.csv
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment