Flink入門訓練--以New York City Taxi為例
最近在學Flink,準備用Flink搭建一個實時的推薦系統。找到一個好的ofollow,noindex" target="_blank">網站 (也算作是flink創始者的官方網站),上面有關於Flink的上手教程,用來練練手,熟悉熟悉,下文僅僅是我的筆記。
1. 資料集
網站New York City Taxi & Limousine Commission 提供了關於紐約市從2009-1015年關於計程車駕駛的公共資料集。
具體資料下載方法,可見# Taxi Data Streams ,下載完資料後,不要解壓縮。
我們的第一個資料集包含紐約市的計程車出行的資訊,每一次出行包含兩個事件:START和END,可以分別理解為開始和結束該行程。每一個事件又包括11個屬性,詳細介紹如下:
taxiId: Long// a unique id for each taxi driverId: Long// a unique id for each driver isStart: Boolean// TRUE for ride start events, FALSE for ride end events startTime: DateTime// the start time of a ride endTime: DateTime// the end time of a ride, //"1970-01-01 00:00:00" for start events startLon: Float// the longitude of the ride start location startLat: Float// the latitude of the ride start location endLon: Float// the longitude of the ride end location endLat: Float// the latitude of the ride end location passengerCnt: Short// number of passengers on the ride
另一個數據集包含計程車的費用資訊,與每一次行程對應:
taxiId: Long// a unique id for each taxi driverId: Long// a unique id for each driver startTime: DateTime// the start time of a ride paymentType: String// CSH or CRD tip: Float// tip(小費) for this ride tolls: Float// tolls for this ride totalFare: Float// total fare collected
2. 生成資料流
首先定義TaxiRide事件,即資料集中的每一個record。
我們使用Flink的source函式(TaxiRideSource)讀取TaxiRide流,這個source是基於事件時間
進行的。同樣的,費用事件TaxiFare的流通過函式TaxiFareSource進行傳送。為了讓生成的流更加真實,事件傳送的時間是與timestamp成比例的。兩個真實相隔十分鐘發生的事件在流中也相差十分鐘。此外,我們可以定義一個變數speed-up factor
為60,該變數為加速因子,那麼真實事件中的一分鐘在流中只有1秒鐘,縮短60倍嘛。不僅如此,我們還可以定義最大服務延時,這個延時使得每個事件在最大服務延時之內隨機出現,這麼做的目的是讓這個流的事件產生與在real-world發生的不確定性更接近。
對於這個應用,我們設定speed-up factor
為600(即10分鐘相當於1秒),以及最大延時時間為60。
所有的行動都應使用事件時間(event time) (相對於處理時間(processing time))來實現。
Event-time decouples the program semantics from serving speed and guarantees consistent results even in case of historic data or data which is delivered out-of-order.
事件時間(event time)將程式語義與服務速度分離開,即使在歷史資料或無序傳送的資料的情況下也能保證一致的結果。簡單來說就是,在資料處理的過程中,依賴的時間跟在流中出現的時間無關,只跟該事件發生的時間有關。
private void generateUnorderedStream(SourceContext<TaxiRide> sourceContext) throws Exception { // 設定服務開始時間servingStartTime long servingStartTime = Calendar.getInstance().getTimeInMillis(); // 資料開始時間dataStartTime,即第一個ride的timestamp long dataStartTime; Random rand = new Random(7452); // 使用優先佇列進行emit,其比較方式為他們的等待時間 PriorityQueue<Tuple2<Long, Object>> emitSchedule = new PriorityQueue<>( 32, new Comparator<Tuple2<Long, Object>>() { @Override public int compare(Tuple2<Long, Object> o1, Tuple2<Long, Object> o2) { return o1.f0.compareTo(o2.f0); } }); // 讀取第一個ride,並將第一個ride插入到schedule裡 String line; TaxiRide ride; if (reader.ready() && (line = reader.readLine()) != null) { // read first ride ride = TaxiRide.fromString(line); // extract starting timestamp dataStartTime = getEventTime(ride); // get delayed time,這個delayedtime是dataStartTime加一個隨機數,隨機數有最大範圍,用來模擬真實世界情況 long delayedEventTime = dataStartTime + getNormalDelayMsecs(rand); // 將ride插入到schedule裡 emitSchedule.add(new Tuple2<Long, Object>(delayedEventTime, ride)); // 設定水印時間 long watermarkTime = dataStartTime + watermarkDelayMSecs; // 下一個水印時間是時間戳是 watermarkTime - maxDelayMsecs - 1 // 只能證明,這個時間一定是小於dataStartTime的Watermark nextWatermark = new Watermark(watermarkTime - maxDelayMsecs - 1); // 將該水印放入Schedule,且這個水印被優先佇列移到了ride之前 emitSchedule.add(new Tuple2<Long, Object>(watermarkTime, nextWatermark)); } else { return; } // 從檔案裡讀取下一個ride(peek) if (reader.ready() && (line = reader.readLine()) != null) { ride = TaxiRide.fromString(line); } // read rides one-by-one and emit a random ride from the buffer each time while (emitSchedule.size() > 0 || reader.ready()) { // insert all events into schedule that might be emitted next // 在Schedule裡的下一個事件的延時後時間long curNextDelayedEventTime = !emitSchedule.isEmpty() ? emitSchedule.peek().f0 : -1; // 當前從檔案讀取的ride的事件時間 long rideEventTime = ride != null ? getEventTime(ride) : -1; // 這個while迴圈用來進行當前Schedule為空的情況 while( ride != null && ( // while there is a ride AND emitSchedule.isEmpty() || // and no ride in schedule OR rideEventTime < curNextDelayedEventTime + maxDelayMsecs) // not enough rides in schedule ) { // insert event into emit schedule long delayedEventTime = rideEventTime + getNormalDelayMsecs(rand); emitSchedule.add(new Tuple2<Long, Object>(delayedEventTime, ride)); // read next ride if (reader.ready() && (line = reader.readLine()) != null) { ride = TaxiRide.fromString(line); rideEventTime = getEventTime(ride); } else { ride = null; rideEventTime = -1; } } // 提取Schedule裡的第一個ride,叫做head Tuple2<Long, Object> head = emitSchedule.poll(); // head應該要到達的時間 long delayedEventTime = head.f0; long now = Calendar.getInstance().getTimeInMillis(); // servingTime = servingStartTime + (delayedEventTime - dataStartTime)/ this.servingSpeed long servingTime = toServingTime(servingStartTime, dataStartTime, delayedEventTime); // 應該再等多久,才讓這個ride發生呢?(哈哈,我好喜歡這個描述) long waitTime = servingTime - now; // 既然要等,那就睡著等吧 Thread.sleep( (waitTime > 0) ? waitTime : 0); // 如果這個head是一個TaxiRide if(head.f1 instanceof TaxiRide) { TaxiRide emitRide = (TaxiRide)head.f1; // emit ride sourceContext.collectWithTimestamp(emitRide, getEventTime(emitRide)); } // 如果這個head是一個水印標誌 else if(head.f1 instanceof Watermark) { Watermark emitWatermark = (Watermark)head.f1; // emit watermark sourceContext.emitWatermark(emitWatermark); // 並設定下一個水印標誌到Schedule中 long watermarkTime = delayedEventTime + watermarkDelayMSecs; // 同樣,保證這個水印的時間戳在下一個ride的timestamp之前 Watermark nextWatermark = new Watermark(watermarkTime - maxDelayMsecs - 1); emitSchedule.add(new Tuple2<Long, Object>(watermarkTime, nextWatermark)); } } }
那麼,如何在java中執行這些sources,下面是一個示例:
// get an ExecutionEnvironment StreamExecutionEnvironment env = StreamExcutionEnvironment.getExecutionEnvironment(); // configure event-time processing env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // get the taxi ride data stream DataStream<TaxiRide> rides = env.addSource( new TaxiRideSource("/path/to/nycTaxiRides.gz", maxDelay, servingSpeed));
另外,有一些應用需要我們使用加入檢查點的機制。檢查點(checkpoint)是從failure中恢復的一種機制。他也需要建立CheckpointedTaxiRideSource來在流中執行。
3. 資料清洗:bath:
3.1 資料連線:link:
由於我們的應用要研究的是在紐約市內的計程車情況,所以我們要排除掉紐約市外的地點。通過這個過濾器:
private static class NYCFilter implements FilterFunction<TaxiRide> { @Override public boolean filter(TaxiRide taxiRide) throws Exception { return GeoUtils.isInNYC(taxiRide.startLon, taxiRide.startLat) && GeoUtils.isInNYC(taxiRide.endLon, taxiRide.endLat); } }
執行過濾器:
// start the data generator DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor))); DataStream<TaxiRide> filteredRides = rides // filter out rides that do not start or stop in NYC .filter(new NYCFilter());
現在我們需要把TaxiRide和TaxiFare兩者的資料記錄結合。在這個過程中,我們要同時處理兩個source的流資料。這裡介紹幾個用到的Transformation functions:
- FlatMap : 輸入1個record,輸出為0或1或更多個records的對映
- Filter :進行評估,如果結果為Ture,則傳輸record
- KeyBy :用來將記錄按照第一個元素(一個字串)進行分組,根據該key將資料進行重新分割槽,然後將記錄再發送給下一個運算元
由於我們沒辦法控制ride和fare到達的先後,所以我們儲存先到的資訊直到和他匹配的資訊到來。這就需要用到有狀態的計算
public class RidesAndFaresExercise extends ExerciseBase { public static void main(String[] args) throws Exception { ParameterTool params = ParameterTool.fromArgs(args); final String ridesFile = params.get("rides", pathToRideData); final String faresFile = params.get("fares", pathToFareData); final int delay = 60; // at most 60 seconds of delay final int servingSpeedFactor = 1800; // 30 minutes worth of events are served every second // set up streaming execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(ExerciseBase.parallelism); DataStream<TaxiRide> rides = env .addSource(rideSourceOrTest(new TaxiRideSource(ridesFile, delay, servingSpeedFactor))) .filter((TaxiRide ride) -> ride.isStart) .keyBy("rideId"); DataStream<TaxiFare> fares = env .addSource(fareSourceOrTest(new TaxiFareSource(faresFile, delay, servingSpeedFactor))) .keyBy("rideId"); DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides = rides .connect(fares) .flatMap(new EnrichmentFunction()); printOrTest(enrichedRides); env.execute("Join Rides with Fares (java RichCoFlatMap)"); } public static class EnrichmentFunction extends RichCoFlatMapFunction<TaxiRide, TaxiFare, Tuple2<TaxiRide, TaxiFare>> { // keyed, managed state private ValueState<TaxiRide> rideState; private ValueState<TaxiFare> fareState; @Override public void open(Configuration config) throws Exception { rideState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved ride", TaxiRide.class)); fareState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved fare", TaxiFare.class)); } @Override public void flatMap1(TaxiRide ride, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception { TaxiFare fare = fareState.value(); if (fare != null) { fareState.clear(); out.collect(new Tuple2(ride, fare)); } else { rideState.update(ride); } } @Override public void flatMap2(TaxiFare fare, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception { TaxiRide ride = rideState.value(); if (ride != null){ rideState.clear(); out.collect(new Tuple2(ride, fare)); } else { fareState.update(fare); } } } }
執行,可以看到,生成的資料是這樣的,ride和fare結合到了一起:
3> (196965,START,2013-01-01 11:54:08,1970-01-01 00:00:00,-73.99048,40.75611,-73.98388,40.767143,2,2013007021,2013014447,196965,2013007021,2013014447,2013-01-01 11:54:08,CSH,0.0,0.0,6.5) 1> (197311,START,2013-01-01 11:55:44,1970-01-01 00:00:00,-73.98894,40.72127,-73.95267,40.771126,1,2013008802,2013012009,197311,2013008802,2013012009,2013-01-01 11:55:44,CRD,2.7,0.0,16.2) 2> (196608,START,2013-01-01 11:53:00,1970-01-01 00:00:00,-73.97817,40.761055,-73.98574,40.75613,2,2013004060,2013014162,196608,2013004060,2013014162,2013-01-01 11:53:00,CSH,0.0,0.0,5.5)
3.2 狀態快取清理
那麼現在,我們想要上面的兩者結合操作更加的Robust。對於現實中的資料,有時某些record會丟失,這意味著我們可能只收到TaxiRide
andTaxiFare
中的一個,另一個永遠不會到。所以先到的那個record會一直佔用著記憶體。為了解決這個問題,我們嘗試在CoProcessFunction中清理掉沒有被匹配的狀態。
這個功能定義在類ExpiringStateExercise
中:
首先給出missing data的輸入,這裡我們丟掉所有ride的END事件,START事件每隔1000個丟失一個。:hushed:
DataStream<TaxiRide> rides = env .addSource(rideSourceOrTest(new CheckpointedTaxiRideSource(ridesFile, servingSpeedFactor))) .filter((TaxiRide ride) -> (ride.isStart && (ride.rideId % 1000 != 0))) .keyBy(ride -> ride.rideId); SingleOutputStreamOperator processed = rides .connect(fares) // Applies the given ProcessFunction on the input stream, thereby creating a transformed output stream. // The function will be called for every element in the input streams and can produce zero or more output elements. .process(new EnrichmentFunction());
我們使用CoprocessingFunction 來進行上面描述的操作。對於有兩個inputs的流來說,下面的描述生動形象的介紹了我們需要override的3個方法:
For example, you might be joining customer data to financial trades, while keeping state for the customer data. If you care about having complete and deterministic joins in the face of out-of-order events, you can use a timer to evaluate and emit the join for a trade when the watermark for the customer data stream has passed the time of that trade.
processElement1(...)
&processElement2(...)
用於兩個資料流的call。onTimer()
用於設定拋棄掉沒有尋到匹配的record的動作。
@Override // Called when a timer set using TimerService fires. public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception { if (fareState.value() != null) { ctx.output(unmatchedFares, fareState.value()); fareState.clear(); } if (rideState.value() != null) { ctx.output(unmatchedRides, rideState.value()); rideState.clear(); } } @Override // A Context that allows querying the timestamp of the element, // querying the TimeDomain of the firing timer and getting a TimerService for registering timers and querying the time. // The context is only valid during the invocation of this method, do not store it. public void processElement1(TaxiRide ride, Context context, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception { // 當前處理事件是ride,且當前狀態中fare為非空, 則輸出。 // (由於ride在之前已經被keyby()過,這裡只會傳送跟fare相同rideId的ride)TaxiFare fare = fareState.value(); if (fare != null) { fareState.clear(); out.collect(new Tuple2(ride, fare)); } else { // 否則,更新rideState rideState.update(ride); // 只要水印到達,我們就停止等待相應的fare // Registers a timer to be fired when the event time watermark passes the given time. context.timerService().registerEventTimeTimer(ride.getEventTime()); } }
輸出結果如下,可以看到輸出的內容的時間戳都相差1000,跟之前定義的一致。
1> 1000,2013000992,2013000989,2013-01-01 00:05:38,CSH,0.0,4.8,18.3 3> 2000,2013001967,2013001964,2013-01-01 00:08:25,CSH,0.0,0.0,17.5 4> 3000,2013002904,2013002901,2013-01-01 00:11:00,CRD,4.38,0.0,22.38
3.3 視窗
現在,我們想確定每小時獲得最多小費(tip)的駕駛員(每一條fare的record裡有小費這一欄)。 最簡單的方法是分兩步:首先使用一小時長的時間視窗(time window)來計算每小時內每個駕駛員的總提示,然後從該視窗流的結果中找到每小時獲得最多小費的駕駛員。
我們在下列code中會遇到以下幾個問題:
AggregareFunction: 這個函式有一個將輸入元素加到accumulator的方法。首先,這個函式介面有一個初始化accumulator的方法,並且可以將兩個accumulators融合成一個,不僅如此還可以從accumulator中提取出output。
ProcessWindowFunction:這個函式輸入一個包含視窗的所有元素的可迭代的集合以及一個包含time和state的Context object,這些輸入能夠使他提供更加豐富的功能。
public class HourlyTipsExercise extends ExerciseBase { public static void main(String[] args) throws Exception { // read parameters ParameterTool params = ParameterTool.fromArgs(args); final String input = params.get("input", ExerciseBase.pathToFareData); final int maxEventDelay = 60; // events are out of order by max 60 seconds final int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second // set up streaming execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(ExerciseBase.parallelism); // start the data generator DataStream<TaxiFare> fares = env.addSource(fareSourceOrTest(new TaxiFareSource(input, maxEventDelay, servingSpeedFactor))); // compute tips per hour for each driver DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares // 根據driveId 進行分組 .keyBy((TaxiFare fare) -> fare.driverId) // 設定視窗時間為1小時 .timeWindow(Time.hours(1)) // AddTips()為aggFunction, WrapWithWindowInfo()為windowFunction .aggregate(new AddTips(), new WrapWithWindowInfo()); // find the highest total tips in each hour DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips .timeWindowAll(Time.hours(1)) .maxBy(2); printOrTest(hourlyMax); // execute the transformation pipeline env.execute("Hourly Tips (java)"); } /* Adds up the tips. */ public static class AddTips implements AggregateFunction< TaxiFare, // input type Float, // accumulator type Float// output type > { @Override public Float createAccumulator() { return 0F; } @Override public Float add(TaxiFare fare, Float aFloat) { return fare.tip + aFloat; } @Override public Float getResult(Float aFloat) { return aFloat; } @Override public Float merge(Float aFloat, Float accumulator) { return aFloat + accumulator; } } /* * Wraps the pre-aggregated result into a tuple along with the window's timestamp and key. */ public static class WrapWithWindowInfo extends ProcessWindowFunction< Float, Tuple3<Long, Long, Float>, Long, TimeWindow> { @Override public void process(Long key, Context context, Iterable<Float> elements,Collector<Tuple3<Long, Long, Float>> out) throws Exception { Float sumOfTips = elements.iterator().next(); out.collect(new Tuple3<>(context.window().getEnd(), key, sumOfTips)); } } }
以下是輸出結果:
1> (1357002000000,2013000493,54.45) 2> (1357005600000,2013010467,64.53) 3> (1357009200000,2013010589,104.75)
4. Broadcast State
廣播變數(Broadcast State):這種機制用來支援資料從需要上游任務廣播傳送到下游任務的事件。
這篇文章對廣播變數講的很詳細:# A Practical Guide to Broadcast State in Apache Flink
在這個機制中,我們將系統分為actions stream和pattern stream。actions stream即為正常的資料流,也就是例子中 rides。pattern為我們廣播的資料流,這裡可以理解為我們的監聽室需要對rides進行監聽,即我們傳輸一個pattern到broadcast state中,然後operator打印出action stream中符合這個pattern的資料。
在這裡,我們的pattern是一個interger n,代表分鐘數。我們想要列印的是在我們傳送這個pattern的時刻,所有已經開始了n分鐘且還沒有結束的rides。
接下來是他的應用程式碼:
首先,在這個簡單的例子中,我們需要一個廣播變數描述符,但是並不用他儲存東西。
final MapStateDescriptor<Long, Long> dummyBroadcastState = new MapStateDescriptor<>( "dummy", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO );
然後,設定一個socket介面,用來接收pattern:
BroadcastStream<String> queryStream = env.socketTextStream("localhost", 9999) .assignTimestampsAndWatermarks(new QueryStreamAssigner()) .broadcast(dummyBroadcastState);
當我們得到按照rideId分組後的rides stream以及從socket返回的分鐘n的broadcast stream後,我們連線這兩個streams。然後將它傳送到QueryFunction()
處理。QueryFunction
將pattern(也就是socket返回的分鐘數n)與ride進行匹配,最後返回被匹配的rides。
DataStream<TaxiRide> reports = rides .keyBy((TaxiRide ride) -> ride.taxiId) .connect(queryStream) .process(new QueryFunction()); public static class QueryFunction extends KeyedBroadcastProcessFunction<Long, TaxiRide, String, TaxiRide> { private ValueStateDescriptor<TaxiRide> taxiDescriptor =new ValueStateDescriptor<>("saved ride", TaxiRide.class); private ValueState<TaxiRide> taxiState; @Override public void open(Configuration config) { // 得到每一個taxi的上一個事件的狀態 taxiState = getRuntimeContext().getState(taxiDescriptor); } @Override public void processElement(TaxiRide ride, ReadOnlyContext ctx, Collector< TaxiRide> out) throws Exception { // For every taxi, let's store the most up-to-date information. // TaxiRide implements Comparable to make this easy.TaxiRide savedRide = taxiState.value(); if (ride.compareTo(savedRide) > 0) { taxiState.update(ride); } } @Override public void processBroadcastElement(String msg, Context ctx, Collector<TaxiRide> out) throws Exception { DateTimeFormatter timeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withLocale(Locale.US).withZoneUTC(); Long thresholdInMinutes = Long.valueOf(msg); Long wm = ctx.currentWatermark(); System.out.println("QUERY: " + thresholdInMinutes + " minutes at " + timeFormatter.print(wm)); // Collect to the output all ongoing rides that started at least thresholdInMinutes ago. ctx.applyToKeyedState(taxiDescriptor, new KeyedStateFunction<Long, ValueState<TaxiRide>>() { @Override public void process(Long taxiId, ValueState<TaxiRide> taxiState) throws Exception { TaxiRide ride = taxiState.value(); if (ride.isStart) { long minutes = (wm - ride.getEventTime()) / 60000; if (ride.isStart && (minutes >= thresholdInMinutes)) { out.collect(ride); } } } }); } }
Reference:
- data Artisans
- 《Flink基礎教程》
- 《Learning Apache Flink》