EMMA Coverage Report (generated Tue Jul 10 07:50:22 IST 2012)
[all classes][org.wso2.siddhi.core.projector]

COVERAGE SUMMARY FOR SOURCE FILE [QueryProjector.java]

nameclass, %method, %block, %line, %
QueryProjector.java100% (1/1)89%  (8/9)99%  (315/319)95%  (60/63)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class QueryProjector100% (1/1)89%  (8/9)99%  (315/319)95%  (60/63)
process (List): void 0%   (0/1)0%   (0/1)0%   (0/1)
process (AtomicEvent): void 100% (1/1)97%  (97/100)88%  (15/17)
QueryProjector (String, Projector, List): void 100% (1/1)100% (53/53)100% (12/12)
generateGroupByOutputAttributeGenerator (List, List): GroupByOutputAttributeG... 100% (1/1)100% (13/13)100% (3/3)
generateHavingExecutor (Condition): void 100% (1/1)100% (26/26)100% (5/5)
generateOutputAttributeGenerators (List, GroupByOutputAttributeGenerator): void 100% (1/1)100% (77/77)100% (15/15)
getOutputStreamDefinition (): StreamDefinition 100% (1/1)100% (3/3)100% (1/1)
process (ListEvent): void 100% (1/1)100% (42/42)100% (7/7)
setStreamJunction (StreamJunction): void 100% (1/1)100% (4/4)100% (2/2)

1/*
2*  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
3*
4*  WSO2 Inc. licenses this file to you under the Apache License,
5*  Version 2.0 (the "License"); you may not use this file except
6*  in compliance with the License.
7*  You may obtain a copy of the License at
8*
9*    http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing,
12* software distributed under the License is distributed on an
13* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14* KIND, either express or implied.  See the License for the
15* specific language governing permissions and limitations
16* under the License.
17*/
18package org.wso2.siddhi.core.projector;
19 
20import org.wso2.siddhi.core.event.AtomicEvent;
21import org.wso2.siddhi.core.event.Event;
22import org.wso2.siddhi.core.event.ListEvent;
23import org.wso2.siddhi.core.event.StreamEvent;
24import org.wso2.siddhi.core.event.in.InEvent;
25import org.wso2.siddhi.core.event.in.InStream;
26import org.wso2.siddhi.core.event.remove.RemoveEvent;
27import org.wso2.siddhi.core.event.remove.RemoveStream;
28import org.wso2.siddhi.core.executor.conditon.ConditionExecutor;
29import org.wso2.siddhi.core.projector.attibute.generator.AbstractAggregateAttributeGenerator;
30import org.wso2.siddhi.core.projector.attibute.generator.SimpleOutputAttributeGenerator;
31import org.wso2.siddhi.core.projector.attibute.generator.groupby.GroupByOutputAttributeGenerator;
32import org.wso2.siddhi.core.stream.StreamJunction;
33import org.wso2.siddhi.core.util.parser.AggregatorParser;
34import org.wso2.siddhi.core.util.parser.ExecutorParser;
35import org.wso2.siddhi.query.api.condition.Condition;
36import org.wso2.siddhi.query.api.definition.StreamDefinition;
37import org.wso2.siddhi.query.api.expression.Variable;
38import org.wso2.siddhi.query.api.query.QueryEventStream;
39import org.wso2.siddhi.query.api.query.projection.Projector;
40import org.wso2.siddhi.query.api.query.projection.attribute.AggregationAttribute;
41import org.wso2.siddhi.query.api.query.projection.attribute.OutputAttribute;
42import org.wso2.siddhi.query.api.query.projection.attribute.SimpleAttribute;
43 
44import java.util.ArrayList;
45import java.util.List;
46 
47public class QueryProjector {
48 
49    //    private List<Object[]> dataList = new ArrayList<Object[]>();
50    private List<OutputAttributeGenerator> outputAttributeGeneratorList;
51    private List<OutputAttributeGenerator> aggregateOutputAttributeGeneratorList;
52    private int outputSize;
53    private String outputStreamId;
54    private StreamDefinition outputStreamDefinition = new StreamDefinition();
55    private StreamJunction outputStreamJunction;
56    private Projector projector;
57    private ConditionExecutor havingConditionExecutor = null;
58 
59    public QueryProjector(String outputStreamId, Projector projector,
60                          List<QueryEventStream> queryEventStreamList) {
61        this.outputStreamId = outputStreamId;
62        outputStreamDefinition.name(outputStreamId);
63        this.projector = projector;
64        outputSize = projector.getProjectionList().size();
65        outputAttributeGeneratorList = new ArrayList<OutputAttributeGenerator>(outputSize);
66        aggregateOutputAttributeGeneratorList = new ArrayList<OutputAttributeGenerator>(outputSize);
67        generateOutputAttributeGenerators(queryEventStreamList, generateGroupByOutputAttributeGenerator(projector.getGroupByList(), queryEventStreamList));
68        generateHavingExecutor(projector.getHavingCondition());
69 
70    }
71 
72    private void generateHavingExecutor(Condition condition) {
73        if (condition != null) {
74            List<QueryEventStream> queryEventStreamList = new ArrayList<QueryEventStream>();
75            queryEventStreamList.add(new QueryEventStream(outputStreamId, outputStreamId, outputStreamDefinition));
76            havingConditionExecutor = ExecutorParser.parseCondition(condition, queryEventStreamList, outputStreamId);
77        }
78    }
79 
80    private GroupByOutputAttributeGenerator generateGroupByOutputAttributeGenerator(
81            List<Variable> groupByList,
82            List<QueryEventStream> queryEventStreamList) {
83 
84        if (groupByList.size() > 0) {
85            return new GroupByOutputAttributeGenerator(projector.getGroupByList(), queryEventStreamList);
86        } else {
87            return null;
88        }
89    }
90 
91    private void generateOutputAttributeGenerators(List<QueryEventStream> queryEventStreamList,
92                                                   GroupByOutputAttributeGenerator groupByOutputAttributeGenerator) {
93        for (OutputAttribute outputAttribute : projector.getProjectionList()) {
94            if (outputAttribute instanceof SimpleAttribute) {
95                SimpleOutputAttributeGenerator attributeGenerator = new SimpleOutputAttributeGenerator(ExecutorParser.parseExpression(((SimpleAttribute) outputAttribute).getExpression(), queryEventStreamList, null));
96                outputAttributeGeneratorList.add(attributeGenerator);
97                outputStreamDefinition.attribute(outputAttribute.getRename(), attributeGenerator.getType());
98            } else {  //Aggregations
99                AbstractAggregateAttributeGenerator attributeGenerator = AggregatorParser.loadAggregatorClass(((AggregationAttribute) outputAttribute).getAggregationName());
100                if (groupByOutputAttributeGenerator != null) { //for group
101                    attributeGenerator = groupByOutputAttributeGenerator.createNewInstance().assignAggregateAttributeGenerator(attributeGenerator);
102                }
103                attributeGenerator.assignExpressions(((AggregationAttribute) outputAttribute).getExpressions(), queryEventStreamList);
104                outputAttributeGeneratorList.add(attributeGenerator);
105                aggregateOutputAttributeGeneratorList.add(attributeGenerator);
106                outputStreamDefinition.attribute(outputAttribute.getRename(), attributeGenerator.getType());
107            }
108        }
109 
110    }
111 
112 
113    public StreamDefinition getOutputStreamDefinition() {
114        return outputStreamDefinition;
115    }
116 
117    public void setStreamJunction(StreamJunction streamJunction) {
118        this.outputStreamJunction = streamJunction;
119    }
120 
121    public void process(AtomicEvent atomicEvent) {
122//        System.out.println("Arrived ");
123        Object[] data = new Object[outputSize];
124        for (int i = 0, outputAttributeGeneratorListSize = outputAttributeGeneratorList.size(); i < outputAttributeGeneratorListSize; i++) {
125            OutputAttributeGenerator outputAttributeGenerator = outputAttributeGeneratorList.get(i);
126            data[i] = outputAttributeGenerator.process(atomicEvent);
127        }
128        //   dataList.add(data);
129        try {
130            if (havingConditionExecutor == null) {
131                if (atomicEvent instanceof InStream) {
132                    outputStreamJunction.send(new InEvent(outputStreamId, atomicEvent.getTimeStamp(), data));
133                } else {
134                    outputStreamJunction.send(new RemoveEvent(outputStreamId, atomicEvent.getTimeStamp(), data,((RemoveStream)atomicEvent).getExpiryTime()));
135                }
136            } else {
137                StreamEvent event;
138                if (atomicEvent instanceof InStream) {
139                    event = new InEvent(outputStreamId, atomicEvent.getTimeStamp(), data);
140                } else {
141                    event = new RemoveEvent(outputStreamId, atomicEvent.getTimeStamp(), data,((RemoveStream)atomicEvent).getExpiryTime());
142                }
143                if (havingConditionExecutor.execute((AtomicEvent) event)) {
144                    outputStreamJunction.send(event);
145                }
146            }
147 
148        } catch (InterruptedException e) {
149            e.printStackTrace();  //todo handle
150        }
151 
152    }
153 
154    public void process(List<AtomicEvent> atomicEventList) {
155        //todo
156    }
157 
158    public void process(ListEvent listEvent) {
159        Event[] events = listEvent.getEvents();
160        for (int i = 0, iterateLength = events.length - 1; i < iterateLength; i++) {
161            for (OutputAttributeGenerator outputAttributeGenerator : aggregateOutputAttributeGeneratorList) {
162                outputAttributeGenerator.process(events[i]);
163            }
164        }
165        process(events[events.length - 1]);
166    }
167 
168}
169 

[all classes][org.wso2.siddhi.core.projector]
EMMA 2.1.5320 (stable) (C) Vladimir Roubtsov