1 | /* |
2 | * Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. |
3 | * |
4 | * WSO2 Inc. licenses this file to you under the Apache License, |
5 | * Version 2.0 (the "License"); you may not use this file except |
6 | * in compliance with the License. |
7 | * You may obtain a copy of the License at |
8 | * |
9 | * http://www.apache.org/licenses/LICENSE-2.0 |
10 | * |
11 | * Unless required by applicable law or agreed to in writing, |
12 | * software distributed under the License is distributed on an |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | * KIND, either express or implied. See the License for the |
15 | * specific language governing permissions and limitations |
16 | * under the License. |
17 | */ |
18 | package org.wso2.siddhi.core.stream.handler.filter; |
19 | |
20 | import org.wso2.siddhi.core.event.AtomicEvent; |
21 | import org.wso2.siddhi.core.event.Event; |
22 | import org.wso2.siddhi.core.event.ComplexEvent; |
23 | import org.wso2.siddhi.core.event.ListEvent; |
24 | import org.wso2.siddhi.core.event.StreamEvent; |
25 | import org.wso2.siddhi.core.executor.conditon.ConditionExecutor; |
26 | import org.wso2.siddhi.core.util.SchedulerQueue; |
27 | import org.wso2.siddhi.core.stream.StreamElement; |
28 | import org.wso2.siddhi.core.stream.StreamProcessor; |
29 | import org.wso2.siddhi.core.stream.handler.StreamHandler; |
30 | import org.wso2.siddhi.query.api.query.QueryEventStream; |
31 | |
32 | import java.util.ArrayList; |
33 | import java.util.List; |
34 | |
35 | public class FilterHandler implements StreamHandler { |
36 | private ConditionExecutor conditionExecutor; |
37 | private List<QueryEventStream> queryEventStreamList; |
38 | private StreamProcessor nextPreStreamFlowProcessor; |
39 | private StreamElement prevStreamElement; |
40 | |
41 | public FilterHandler(ConditionExecutor conditionExecutor, |
42 | List<QueryEventStream> queryEventStreamList) { |
43 | this.conditionExecutor = conditionExecutor; |
44 | this.queryEventStreamList = queryEventStreamList; |
45 | } |
46 | |
47 | @Override |
48 | public void process(ComplexEvent complexEvent) { |
49 | // //System.out.println("FILTER "+complexEvent); |
50 | if (complexEvent instanceof ListEvent) { |
51 | List<Event> list = new ArrayList<Event>(); |
52 | for (Event event : ((ListEvent) complexEvent).getEvents()) { |
53 | if (conditionExecutor.execute(event)) { |
54 | list.add(event); |
55 | } |
56 | } |
57 | if (list.size() > 0) { |
58 | ((ListEvent) complexEvent).setEvents(list.toArray(new Event[list.size()])); |
59 | nextPreStreamFlowProcessor.process(complexEvent); |
60 | } |
61 | } else if (conditionExecutor.execute((AtomicEvent) complexEvent)) { |
62 | nextPreStreamFlowProcessor.process(complexEvent); |
63 | } |
64 | } |
65 | |
66 | @Override |
67 | public void setNext(StreamProcessor nextPreStreamFlowProcessor) { |
68 | this.nextPreStreamFlowProcessor = nextPreStreamFlowProcessor; |
69 | } |
70 | |
71 | @Override |
72 | public void setPrevious(StreamElement streamElement) { |
73 | this.prevStreamElement = streamElement; |
74 | } |
75 | |
76 | @Override |
77 | public String getStreamId() { |
78 | return prevStreamElement.getStreamId(); |
79 | } |
80 | |
81 | @Override |
82 | public SchedulerQueue<StreamEvent> getWindow() { |
83 | return prevStreamElement.getWindow(); |
84 | } |
85 | } |