EMMA Coverage Report (generated Tue Jul 10 07:50:22 IST 2012)
[all classes][org.wso2.siddhi.core.util.parser]

COVERAGE SUMMARY FOR SOURCE FILE [StreamParser.java]

nameclass, %method, %block, %line, %
StreamParser.java100% (2/2)75%  (3/4)97%  (701/721)95%  (127.9/134)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class StreamParser$1100% (1/1)100% (1/1)89%  (17/19)89%  (0.9/1)
<static initializer> 100% (1/1)89%  (17/19)89%  (0.9/1)
     
class StreamParser100% (1/1)67%  (2/3)97%  (684/702)96%  (128/134)
StreamParser (): void 0%   (0/1)0%   (0/3)0%   (0/2)
parseStream (Stream, List, QueryProjector, ThreadPoolExecutor): List 100% (1/1)97%  (562/577)96%  (103/107)
parseStreamHandler (SingleStream, List, SingleStreamPacker): List 100% (1/1)100% (122/122)100% (25/25)

1/*
2*  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
3*
4*  WSO2 Inc. licenses this file to you under the Apache License,
5*  Version 2.0 (the "License"); you may not use this file except
6*  in compliance with the License.
7*  You may obtain a copy of the License at
8*
9*    http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing,
12* software distributed under the License is distributed on an
13* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14* KIND, either express or implied.  See the License for the
15* specific language governing permissions and limitations
16* under the License.
17*/
18package org.wso2.siddhi.core.util.parser;
19 
20import org.wso2.siddhi.core.executor.conditon.ConditionExecutor;
21import org.wso2.siddhi.core.projector.QueryProjector;
22import org.wso2.siddhi.core.statemachine.pattern.AndPatternState;
23import org.wso2.siddhi.core.statemachine.pattern.CountPatternState;
24import org.wso2.siddhi.core.statemachine.pattern.OrPatternState;
25import org.wso2.siddhi.core.statemachine.pattern.PatternState;
26import org.wso2.siddhi.core.statemachine.sequence.CountSequenceState;
27import org.wso2.siddhi.core.statemachine.sequence.OrSequenceState;
28import org.wso2.siddhi.core.statemachine.sequence.SequenceState;
29import org.wso2.siddhi.core.stream.StreamProcessor;
30import org.wso2.siddhi.core.stream.handler.StreamHandler;
31import org.wso2.siddhi.core.stream.handler.filter.FilterHandler;
32import org.wso2.siddhi.core.stream.handler.window.WindowHandler;
33import org.wso2.siddhi.core.stream.packer.SingleStreamPacker;
34import org.wso2.siddhi.core.stream.packer.join.JoinStreamPacker;
35import org.wso2.siddhi.core.stream.packer.join.LeftJoinStreamPacker;
36import org.wso2.siddhi.core.stream.packer.join.RightJoinStreamPacker;
37import org.wso2.siddhi.core.stream.packer.pattern.AndPatternStreamPacker;
38import org.wso2.siddhi.core.stream.packer.pattern.CountPatternStreamPacker;
39import org.wso2.siddhi.core.stream.packer.pattern.OrPatternStreamPacker;
40import org.wso2.siddhi.core.stream.packer.pattern.PatternStreamPacker;
41import org.wso2.siddhi.core.stream.packer.sequence.CountSequenceStreamPacker;
42import org.wso2.siddhi.core.stream.packer.sequence.OrSequenceStreamPacker;
43import org.wso2.siddhi.core.stream.packer.sequence.SequenceStreamPacker;
44import org.wso2.siddhi.core.stream.recevier.SingleStreamReceiver;
45import org.wso2.siddhi.core.stream.recevier.StreamReceiver;
46import org.wso2.siddhi.core.stream.recevier.pattern.AndPatternSingleStreamReceiver;
47import org.wso2.siddhi.core.stream.recevier.pattern.CountPatternSingleStreamReceiver;
48import org.wso2.siddhi.core.stream.recevier.pattern.OrPatternSingleStreamReceiver;
49import org.wso2.siddhi.core.stream.recevier.pattern.PatternSingleStreamReceiver;
50import org.wso2.siddhi.core.stream.recevier.pattern.PatternStreamReceiver;
51import org.wso2.siddhi.core.stream.recevier.sequence.CountSequenceSingleStreamReceiver;
52import org.wso2.siddhi.core.stream.recevier.sequence.OrSequenceSingleStreamReceiver;
53import org.wso2.siddhi.core.stream.recevier.sequence.SequenceSingleStreamReceiver;
54import org.wso2.siddhi.core.stream.recevier.sequence.SequenceStreamReceiver;
55import org.wso2.siddhi.query.api.condition.Condition;
56import org.wso2.siddhi.query.api.expression.Expression;
57import org.wso2.siddhi.query.api.query.QueryEventStream;
58import org.wso2.siddhi.query.api.stream.JoinStream;
59import org.wso2.siddhi.query.api.stream.SingleStream;
60import org.wso2.siddhi.query.api.stream.Stream;
61import org.wso2.siddhi.query.api.stream.handler.Handler;
62import org.wso2.siddhi.query.api.stream.pattern.PatternStream;
63import org.wso2.siddhi.query.api.stream.sequence.SequenceStream;
64 
65import java.util.ArrayList;
66import java.util.List;
67import java.util.concurrent.ThreadPoolExecutor;
68 
69public class StreamParser {
70 
71 
72    public static List<StreamReceiver> parseStream(Stream queryStream,
73                                                   List<QueryEventStream> queryEventStreamList,
74                                                   QueryProjector queryProjector,
75                                                   ThreadPoolExecutor threadPoolExecutor) {
76        List<StreamReceiver> streamReceiverList = new ArrayList<StreamReceiver>();
77        if (queryStream instanceof SingleStream) {
78            List<StreamProcessor> simpleStreamProcessorList = parseStreamHandler((SingleStream) queryStream, queryEventStreamList, new SingleStreamPacker());
79 
80            SingleStreamReceiver receiver = new SingleStreamReceiver((SingleStream) queryStream, simpleStreamProcessorList.get(0), threadPoolExecutor);
81 
82            SingleStreamPacker singleStreamPacker = (SingleStreamPacker) simpleStreamProcessorList.get(simpleStreamProcessorList.size() - 1);
83 
84            //singleStreamPacker next
85            singleStreamPacker.setNext(queryProjector);
86 
87            streamReceiverList.add(receiver);
88            return streamReceiverList;
89 
90        } else if (queryStream instanceof JoinStream) {
91            ConditionExecutor onConditionExecutor;
92            if (((JoinStream) queryStream).getOnCompare() != null) {
93                onConditionExecutor = ExecutorParser.parseCondition(((JoinStream) queryStream).getOnCompare(), queryEventStreamList, null);
94            } else {
95                onConditionExecutor = ExecutorParser.parseCondition(Condition.bool(Expression.value(true)), queryEventStreamList, null);
96            }
97            JoinStreamPacker leftJoinStreamPacker;
98            JoinStreamPacker rightJoinStreamPacker;
99            switch (((JoinStream) queryStream).getTrigger()) {
100                case LEFT:
101                    leftJoinStreamPacker = new LeftJoinStreamPacker(onConditionExecutor, true);
102                    rightJoinStreamPacker = new RightJoinStreamPacker(onConditionExecutor, false);
103                    break;
104                case RIGHT:
105                    leftJoinStreamPacker = new LeftJoinStreamPacker(onConditionExecutor, false);
106                    rightJoinStreamPacker = new RightJoinStreamPacker(onConditionExecutor, true);
107                    break;
108                default:
109                    leftJoinStreamPacker = new LeftJoinStreamPacker(onConditionExecutor, true);
110                    rightJoinStreamPacker = new RightJoinStreamPacker(onConditionExecutor, true);
111                    break;
112            }
113 
114            List<StreamProcessor> leftSimpleStreamProcessorList = parseStreamHandler((SingleStream) ((JoinStream) queryStream).getLeftStream(), queryEventStreamList, leftJoinStreamPacker);
115            List<StreamProcessor> rightSimpleStreamProcessorList = parseStreamHandler((SingleStream) ((JoinStream) queryStream).getRightStream(), queryEventStreamList, rightJoinStreamPacker);
116 
117            SingleStreamReceiver leftReceiver = new SingleStreamReceiver((SingleStream) ((JoinStream) queryStream).getLeftStream(), leftSimpleStreamProcessorList.get(0), threadPoolExecutor);
118            SingleStreamReceiver rightReceiver = new SingleStreamReceiver((SingleStream) ((JoinStream) queryStream).getRightStream(), rightSimpleStreamProcessorList.get(0), threadPoolExecutor);
119 
120            //joinStreamPacker next
121            leftJoinStreamPacker.setNext(queryProjector);
122            rightJoinStreamPacker.setNext(queryProjector);
123 
124 
125            //joinStreamPacker prev
126            JoinStreamPacker leftSingleStreamPacker = (JoinStreamPacker) leftSimpleStreamProcessorList.get(leftSimpleStreamProcessorList.size() - 1);
127            JoinStreamPacker rightSingleStreamPacker = (JoinStreamPacker) rightSimpleStreamProcessorList.get(rightSimpleStreamProcessorList.size() - 1);
128            rightJoinStreamPacker.setOppositeWindow(leftSingleStreamPacker.getWindow());
129            leftJoinStreamPacker.setOppositeWindow(rightSingleStreamPacker.getWindow());
130 
131            streamReceiverList.add(leftReceiver);
132            streamReceiverList.add(rightReceiver);
133            return streamReceiverList;
134 
135        } else if (queryStream instanceof PatternStream) {
136 
137            List<PatternState> patternStateList = StateParser.convertToPatternStateList(StateParser.identifyStates(((PatternStream) queryStream).getPatternElement()));
138            //    queryEventStreamList ;
139            // PatternStreamPacker patternStreamPacker = new PatternStreamPacker(stateList);
140            // PatternSingleStreamReceiver[] patternSingleStreamReceiverArray = new PatternSingleStreamReceiver[stateList.size()];
141            for (String streamId : queryStream.getStreamIds()) {
142 
143                //    List<SingleStream> streamList = new ArrayList<SingleStream>();
144                List<PatternSingleStreamReceiver> patternSingleStreamReceiverList = new ArrayList<PatternSingleStreamReceiver>();
145                for (PatternState state : patternStateList) {
146                    if (state.getSingleStream().getStreamId().equals(streamId)) {
147                        //           streamList.add(state.getSingleStream());
148                        PatternStreamPacker patternStreamPacker;
149                        if (state instanceof OrPatternState) {
150                            patternStreamPacker = new OrPatternStreamPacker(((OrPatternState) state));
151                        } else if (state instanceof AndPatternState) {
152                            patternStreamPacker = new AndPatternStreamPacker(((AndPatternState) state));
153                        } else if (state instanceof CountPatternState) {
154                            patternStreamPacker = new CountPatternStreamPacker((CountPatternState) state);
155                        } else {
156                            patternStreamPacker = new PatternStreamPacker(state);
157                        }
158                        List<StreamProcessor> simpleStreamProcessorList = parseStreamHandler((SingleStream) state.getSingleStream(), queryEventStreamList, patternStreamPacker);
159 
160                        PatternSingleStreamReceiver patternSingleStreamReceiver;
161 
162                        if (state instanceof OrPatternState) {
163                            patternSingleStreamReceiver = new OrPatternSingleStreamReceiver(((OrPatternState) state), simpleStreamProcessorList.get(0), patternStateList.size());
164                        } else if (state instanceof AndPatternState) {
165                            patternSingleStreamReceiver = new AndPatternSingleStreamReceiver(((AndPatternState) state), simpleStreamProcessorList.get(0), patternStateList.size());
166                        } else if (state instanceof CountPatternState) {
167                            patternSingleStreamReceiver = new CountPatternSingleStreamReceiver(((CountPatternState) state), simpleStreamProcessorList.get(0), patternStateList.size());
168                        } else {
169                            patternSingleStreamReceiver = new PatternSingleStreamReceiver(state, simpleStreamProcessorList.get(0), patternStateList.size());
170                        }
171 
172 
173                        state.setPatternSingleStreamReceiver(patternSingleStreamReceiver);
174                        patternSingleStreamReceiverList.add(patternSingleStreamReceiver);
175                        //  patternSingleStreamReceiverArray[state.getStateNumber()] = patternSingleStreamReceiver;
176 
177//                        PatternStreamPacker patternStreamPacker = (PatternStreamPacker) simpleStreamProcessorList.get(simpleStreamProcessorList.size() - 1);
178                        patternStreamPacker.setStreamReceiver(patternSingleStreamReceiver);
179                        patternStreamPacker.setNext(queryProjector);
180                        state.setPatternStreamPacker(patternStreamPacker);
181 
182                        //patternStreamPacker.setPrevious(singleStreamPacker); since not needed not set
183                    }
184                }
185 
186                PatternStreamReceiver receiver = new PatternStreamReceiver(streamId, patternSingleStreamReceiverList, threadPoolExecutor);
187                streamReceiverList.add(receiver);
188            }
189 
190            //   patternStreamPacker.setPatternSingleStreamReceiverArray(patternSingleStreamReceiverArray);
191            //patternStreamPacker next
192            //  patternStreamPacker.setNext(queryProjector, 0);
193 
194            for (PatternState state : patternStateList) {
195                state.getPatternSingleStreamReceiver().init();
196            }
197            return streamReceiverList;
198 
199        } else if (queryStream instanceof SequenceStream) {
200 
201 
202            List<SequenceState> sequenceStateList = StateParser.convertToSequenceStateList(StateParser.identifyStates(((SequenceStream) queryStream).getSequenceElement()));
203            //    queryEventStreamList ;
204            // PatternStreamPacker patternStreamPacker = new PatternStreamPacker(stateList);
205            // PatternSingleStreamReceiver[] patternSingleStreamReceiverArray = new PatternSingleStreamReceiver[stateList.size()];
206            for (String streamId : queryStream.getStreamIds()) {
207 
208                //    List<SingleStream> streamList = new ArrayList<SingleStream>();
209                List<SequenceSingleStreamReceiver> sequenceSingleStreamReceiverList = new ArrayList<SequenceSingleStreamReceiver>();
210                for (SequenceState state : sequenceStateList) {
211                    if (state.getSingleStream().getStreamId().equals(streamId)) {
212                        //           streamList.add(state.getSingleStream());
213                        SequenceStreamPacker sequenceStreamPacker;
214                        if (state instanceof OrSequenceState) {
215                            sequenceStreamPacker = new OrSequenceStreamPacker(((OrSequenceState) state));
216                        } else if (state instanceof CountSequenceState) {
217                            sequenceStreamPacker = new CountSequenceStreamPacker((CountSequenceState) state);
218                        } else {
219                            sequenceStreamPacker = new SequenceStreamPacker(state);
220                        }
221                        List<StreamProcessor> simpleStreamProcessorList = parseStreamHandler((SingleStream) state.getSingleStream(), queryEventStreamList, sequenceStreamPacker);
222 
223                        SequenceSingleStreamReceiver sequenceSingleStreamReceiver;
224 
225                        if (state instanceof OrSequenceState) {
226                            sequenceSingleStreamReceiver = new OrSequenceSingleStreamReceiver(((OrSequenceState) state), simpleStreamProcessorList.get(0), sequenceStateList.size());
227                        } else if (state instanceof CountSequenceState) {
228                            sequenceSingleStreamReceiver = new CountSequenceSingleStreamReceiver(((CountSequenceState) state), simpleStreamProcessorList.get(0), sequenceStateList.size());
229                        } else {
230                            sequenceSingleStreamReceiver = new SequenceSingleStreamReceiver(state, simpleStreamProcessorList.get(0), sequenceStateList.size());
231                        }
232 
233 
234                        state.setSequenceSingleStreamReceiver(sequenceSingleStreamReceiver);
235                        sequenceSingleStreamReceiverList.add(sequenceSingleStreamReceiver);
236                        //  patternSingleStreamReceiverArray[state.getStateNumber()] = patternSingleStreamReceiver;
237 
238//                        PatternStreamPacker patternStreamPacker = (PatternStreamPacker) simpleStreamProcessorList.get(simpleStreamProcessorList.size() - 1);
239                        sequenceStreamPacker.setStreamReceiver(sequenceSingleStreamReceiver);
240                        sequenceStreamPacker.setNext(queryProjector);
241                        state.setSequenceStreamPacker(sequenceStreamPacker);
242 
243                        //patternStreamPacker.setPrevious(singleStreamPacker); since not needed not set
244                    }
245                }
246 
247                SequenceStreamReceiver receiver = new SequenceStreamReceiver(streamId, sequenceSingleStreamReceiverList, threadPoolExecutor);
248                streamReceiverList.add(receiver);
249            }
250 
251            //   patternStreamPacker.setPatternSingleStreamReceiverArray(patternSingleStreamReceiverArray);
252            //patternStreamPacker next
253            //  patternStreamPacker.setNext(queryProjector, 0);
254 
255            for (SequenceState state : sequenceStateList) {
256                state.getSequenceSingleStreamReceiver().init();
257            }
258 
259            for (StreamReceiver streamReceiver : streamReceiverList) {
260                List<SequenceSingleStreamReceiver> otherStreamReceiverList = new ArrayList<SequenceSingleStreamReceiver>();
261                for (StreamReceiver otherStreamReceiver : streamReceiverList) {
262                    if (otherStreamReceiver != streamReceiver) {
263                        otherStreamReceiverList.addAll(((SequenceStreamReceiver) otherStreamReceiver).getSequenceSingleStreamReceiverList());
264                    }
265                }
266                ((SequenceStreamReceiver) streamReceiver).setOtherStreamReceivers(otherStreamReceiverList);
267            }
268            return streamReceiverList;
269        }
270        return streamReceiverList;
271 
272    }
273 
274    private static List<StreamProcessor> parseStreamHandler(SingleStream inputStream,
275                                                            List<QueryEventStream> queryEventStreamList,
276                                                            SingleStreamPacker singleStreamPacker) {
277        List<StreamProcessor> streamProcessorList = new ArrayList<StreamProcessor>();
278        List<Handler> handlerList = inputStream.getHandlerList();
279        for (int i = 0; i < handlerList.size(); i++) {
280            Handler handler = handlerList.get(i);
281            StreamHandler streamHandler = null;
282            if (handler.getType() == Handler.Type.FILTER) {
283                if (handler.getName() == null) {   //default filter
284                    Condition condition = (Condition) handler.getParameters()[0];
285                    streamHandler = new FilterHandler(ExecutorParser.parseCondition(condition, queryEventStreamList, inputStream.getStreamReferenceId()), queryEventStreamList);
286 
287                }
288            } else if (handler.getType() == Handler.Type.WIN) {
289                WindowHandler windowHandler = (WindowHandler) org.wso2.siddhi.core.util.ClassLoader.loadClass("org.wso2.siddhi.core.stream.handler.window." + handler.getName().substring(0, 1).toUpperCase() + handler.getName().substring(1) + "WindowHandler");
290//                    WindowHandler windowHandler = new TimeWindowHandler();
291                windowHandler.setParameters(handler.getParameters());
292                streamHandler = windowHandler;
293            }
294 
295            if (streamProcessorList.size() > 0) {
296                StreamHandler prevStreamHandler = (StreamHandler) streamProcessorList.get(i - 1);
297                prevStreamHandler.setNext(streamHandler);
298                streamHandler.setPrevious(prevStreamHandler);
299            }
300            streamProcessorList.add(streamHandler);
301 
302        }
303        if (streamProcessorList.size() > 0) {
304            StreamHandler lastStreamHandler = (StreamHandler) streamProcessorList.get(streamProcessorList.size() - 1);
305            lastStreamHandler.setNext(singleStreamPacker);
306            singleStreamPacker.setPrevious(lastStreamHandler);
307 
308        }
309        streamProcessorList.add(singleStreamPacker);
310        return streamProcessorList;
311    }
312 
313}

[all classes][org.wso2.siddhi.core.util.parser]
EMMA 2.1.5320 (stable) (C) Vladimir Roubtsov