EMMA Coverage Report (generated Tue Jul 10 07:50:22 IST 2012)
[all classes][org.wso2.siddhi.core]

COVERAGE SUMMARY FOR SOURCE FILE [SiddhiManager.java]

nameclass, %method, %block, %line, %
SiddhiManager.java100% (1/1)64%  (7/11)80%  (236/296)73%  (41/56)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class SiddhiManager100% (1/1)64%  (7/11)80%  (236/296)73%  (41/56)
addExecutionPlan (String): void 0%   (0/1)0%   (0/25)0%   (0/6)
addQuery (String): void 0%   (0/1)0%   (0/5)0%   (0/2)
defineStream (String): void 0%   (0/1)0%   (0/5)0%   (0/2)
getQueryStreamDefinitionMap (List): Map 0%   (0/1)0%   (0/25)0%   (0/5)
SiddhiManager (): void 100% (1/1)100% (36/36)100% (6/6)
addCallback (String, Callback): void 100% (1/1)100% (15/15)100% (4/4)
addQuery (Query): void 100% (1/1)100% (66/66)100% (12/12)
checkEventStream (StreamDefinition): void 100% (1/1)100% (31/31)100% (5/5)
defineStream (StreamDefinition): void 100% (1/1)100% (23/23)100% (5/5)
getInputHandler (String): InputHandler 100% (1/1)100% (10/10)100% (1/1)
initQuery (Query, List): void 100% (1/1)100% (55/55)100% (8/8)

1/*
2*  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
3*
4*  WSO2 Inc. licenses this file to you under the Apache License,
5*  Version 2.0 (the "License"); you may not use this file except
6*  in compliance with the License.
7*  You may obtain a copy of the License at
8*
9*    http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing,
12* software distributed under the License is distributed on an
13* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14* KIND, either express or implied.  See the License for the
15* specific language governing permissions and limitations
16* under the License.
17*/
18package org.wso2.siddhi.core;
19 
20import org.wso2.siddhi.core.event.in.StateEvent;
21import org.wso2.siddhi.core.exception.EventStreamAlreadyExistException;
22import org.wso2.siddhi.core.projector.QueryProjector;
23import org.wso2.siddhi.core.stream.StreamJunction;
24import org.wso2.siddhi.core.stream.output.Callback;
25import org.wso2.siddhi.core.stream.input.InputHandler;
26import org.wso2.siddhi.core.stream.recevier.StreamReceiver;
27import org.wso2.siddhi.core.util.parser.StreamParser;
28import org.wso2.siddhi.query.api.ExecutionPlan;
29import org.wso2.siddhi.query.api.definition.Attribute;
30import org.wso2.siddhi.query.api.definition.StreamDefinition;
31import org.wso2.siddhi.query.api.expression.Expression;
32import org.wso2.siddhi.query.api.query.Query;
33import org.wso2.siddhi.query.api.query.QueryEventStream;
34import org.wso2.siddhi.query.api.query.projection.attribute.OutputAttribute;
35import org.wso2.siddhi.query.api.query.projection.attribute.SimpleAttribute;
36import org.wso2.siddhi.query.compiler.SiddhiCompiler;
37import org.wso2.siddhi.query.compiler.exception.SiddhiPraserException;
38 
39import java.util.ArrayList;
40import java.util.HashMap;
41import java.util.List;
42import java.util.Map;
43import java.util.concurrent.LinkedBlockingQueue;
44import java.util.concurrent.ThreadPoolExecutor;
45import java.util.concurrent.TimeUnit;
46 
47public class SiddhiManager {
48 
49    private ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
50                                                                           Integer.MAX_VALUE,
51                                                                           50,
52                                                                           TimeUnit.MICROSECONDS,
53                                                                           new LinkedBlockingQueue<Runnable>());
54    Map<String, StreamJunction> streamJunctionMap = new HashMap<String, StreamJunction>(); //contains definition
55    Map<String, StreamDefinition> streamDefinitionMap = new HashMap<String, StreamDefinition>(); //contains definition
56    List<Query> queryList = new ArrayList<Query>();
57    LinkedBlockingQueue<StateEvent> inputQueue = new LinkedBlockingQueue<StateEvent>();
58 
59    public void defineStream(StreamDefinition streamDefinition) {
60        checkEventStream(streamDefinition);
61        StreamJunction streamJunction = new StreamJunction(streamDefinition);
62        streamJunctionMap.put(streamDefinition.getStreamId(), streamJunction);
63        streamDefinitionMap.put(streamDefinition.getStreamId(), streamDefinition);
64    }
65 
66    public void defineStream(String streamDefinition) throws SiddhiPraserException {
67        defineStream(SiddhiCompiler.parseStreamDefinition(streamDefinition));
68    }
69 
70    private void checkEventStream(StreamDefinition newStreamDefinition) {
71        StreamDefinition streamDefinition = streamDefinitionMap.get(newStreamDefinition.getStreamId());
72        if (streamDefinition != null) {
73            if (!streamDefinition.getAttributeList().equals(newStreamDefinition.getAttributeList())) {
74                throw new EventStreamAlreadyExistException(newStreamDefinition.getStreamId() + " is already defined as " + streamDefinition);
75            }
76        }
77    }
78 
79    public void addQuery(String query) throws SiddhiPraserException {
80        addQuery(SiddhiCompiler.parseQuery(query));
81    }
82 
83    public void addExecutionPlan(String addExecutionPlan) throws SiddhiPraserException {
84        for (ExecutionPlan executionPlan : SiddhiCompiler.parse(addExecutionPlan)) {
85            if (executionPlan instanceof StreamDefinition) {
86                defineStream((StreamDefinition) executionPlan);
87            } else {
88                addQuery((Query) executionPlan);
89            }
90        }
91    }
92 
93    public void addQuery(Query query) {
94 
95        List<QueryEventStream> queryEventStreamList = query.getInputStream().constructQueryEventStreamList(streamDefinitionMap, new ArrayList<QueryEventStream>());
96        initQuery(query, queryEventStreamList);
97 
98        QueryProjector queryProjector = new QueryProjector(query.getOutputStreamId(), query.getProjector(), queryEventStreamList);
99        StreamDefinition outputStreamDefinition = queryProjector.getOutputStreamDefinition();
100        defineStream(outputStreamDefinition);
101 
102        List<StreamReceiver> streamReceiverList = StreamParser.parseStream(query.getInputStream(), queryEventStreamList, queryProjector, threadPoolExecutor);
103        queryProjector.setStreamJunction(streamJunctionMap.get(outputStreamDefinition.getStreamId()));
104        for (StreamReceiver streamReceiver : streamReceiverList) {
105            StreamJunction streamJunction = streamJunctionMap.get(streamReceiver.getStreamId());
106            streamJunction.addEventFlow(streamReceiver);
107        }
108 
109    }
110 
111    private void initQuery(Query query, List<QueryEventStream> queryEventStreamList) {
112 
113        //   Map<String, StreamDefinition> queryStreamDefinitionMap = getQueryStreamDefinitionMap(query.getInputStream().getStreamIds());
114        //populate projection for * case
115        List<OutputAttribute> attributeList = query.getProjector().getProjectionList();
116        if (attributeList.size() == 0) {
117            for (QueryEventStream queryEventStream : queryEventStreamList) {
118                for (Attribute attribute : queryEventStream.getStreamDefinition().getAttributeList()) {
119                    attributeList.add(new SimpleAttribute(queryEventStream.getReferenceStreamId() + "_" + attribute.getName(), Expression.variable(queryEventStream.getReferenceStreamId(), attribute.getName())));
120                }
121            }
122        }
123 
124    }
125 
126    private Map<String, StreamDefinition> getQueryStreamDefinitionMap(List<String> streamIds) {
127        Map<String, StreamDefinition> map = new HashMap<String, StreamDefinition>();
128        for (String streamId : streamIds) {
129            map.put(streamId, streamDefinitionMap.get(streamId));
130        }
131        return map;
132    }
133 
134 
135//    private void checkQuery(Query query) {
136//        List<String> streamIds = query.getInputStream().getStreamIds();
137//        ////System.out.println(streamIds);
138//    }
139 
140    public InputHandler getInputHandler(String streamId) {
141        return new InputHandler(streamId, streamJunctionMap.get(streamId));
142    }
143 
144    public void addCallback(String streamId, Callback callback) {
145        callback.setStreamId(streamId);
146        callback.setThreadPoolExecutor(threadPoolExecutor);
147        streamJunctionMap.get(streamId).addEventFlow(callback);
148    }
149}

[all classes][org.wso2.siddhi.core]
EMMA 2.1.5320 (stable) (C) Vladimir Roubtsov