1 | /* |
2 | * Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. |
3 | * |
4 | * WSO2 Inc. licenses this file to you under the Apache License, |
5 | * Version 2.0 (the "License"); you may not use this file except |
6 | * in compliance with the License. |
7 | * You may obtain a copy of the License at |
8 | * |
9 | * http://www.apache.org/licenses/LICENSE-2.0 |
10 | * |
11 | * Unless required by applicable law or agreed to in writing, |
12 | * software distributed under the License is distributed on an |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | * KIND, either express or implied. See the License for the |
15 | * specific language governing permissions and limitations |
16 | * under the License. |
17 | */ |
18 | package org.wso2.siddhi.core; |
19 | |
20 | import org.wso2.siddhi.core.event.in.StateEvent; |
21 | import org.wso2.siddhi.core.exception.EventStreamAlreadyExistException; |
22 | import org.wso2.siddhi.core.projector.QueryProjector; |
23 | import org.wso2.siddhi.core.stream.StreamJunction; |
24 | import org.wso2.siddhi.core.stream.output.Callback; |
25 | import org.wso2.siddhi.core.stream.input.InputHandler; |
26 | import org.wso2.siddhi.core.stream.recevier.StreamReceiver; |
27 | import org.wso2.siddhi.core.util.parser.StreamParser; |
28 | import org.wso2.siddhi.query.api.ExecutionPlan; |
29 | import org.wso2.siddhi.query.api.definition.Attribute; |
30 | import org.wso2.siddhi.query.api.definition.StreamDefinition; |
31 | import org.wso2.siddhi.query.api.expression.Expression; |
32 | import org.wso2.siddhi.query.api.query.Query; |
33 | import org.wso2.siddhi.query.api.query.QueryEventStream; |
34 | import org.wso2.siddhi.query.api.query.projection.attribute.OutputAttribute; |
35 | import org.wso2.siddhi.query.api.query.projection.attribute.SimpleAttribute; |
36 | import org.wso2.siddhi.query.compiler.SiddhiCompiler; |
37 | import org.wso2.siddhi.query.compiler.exception.SiddhiPraserException; |
38 | |
39 | import java.util.ArrayList; |
40 | import java.util.HashMap; |
41 | import java.util.List; |
42 | import java.util.Map; |
43 | import java.util.concurrent.LinkedBlockingQueue; |
44 | import java.util.concurrent.ThreadPoolExecutor; |
45 | import java.util.concurrent.TimeUnit; |
46 | |
47 | public class SiddhiManager { |
48 | |
49 | private ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), |
50 | Integer.MAX_VALUE, |
51 | 50, |
52 | TimeUnit.MICROSECONDS, |
53 | new LinkedBlockingQueue<Runnable>()); |
54 | Map<String, StreamJunction> streamJunctionMap = new HashMap<String, StreamJunction>(); //contains definition |
55 | Map<String, StreamDefinition> streamDefinitionMap = new HashMap<String, StreamDefinition>(); //contains definition |
56 | List<Query> queryList = new ArrayList<Query>(); |
57 | LinkedBlockingQueue<StateEvent> inputQueue = new LinkedBlockingQueue<StateEvent>(); |
58 | |
59 | public void defineStream(StreamDefinition streamDefinition) { |
60 | checkEventStream(streamDefinition); |
61 | StreamJunction streamJunction = new StreamJunction(streamDefinition); |
62 | streamJunctionMap.put(streamDefinition.getStreamId(), streamJunction); |
63 | streamDefinitionMap.put(streamDefinition.getStreamId(), streamDefinition); |
64 | } |
65 | |
66 | public void defineStream(String streamDefinition) throws SiddhiPraserException { |
67 | defineStream(SiddhiCompiler.parseStreamDefinition(streamDefinition)); |
68 | } |
69 | |
70 | private void checkEventStream(StreamDefinition newStreamDefinition) { |
71 | StreamDefinition streamDefinition = streamDefinitionMap.get(newStreamDefinition.getStreamId()); |
72 | if (streamDefinition != null) { |
73 | if (!streamDefinition.getAttributeList().equals(newStreamDefinition.getAttributeList())) { |
74 | throw new EventStreamAlreadyExistException(newStreamDefinition.getStreamId() + " is already defined as " + streamDefinition); |
75 | } |
76 | } |
77 | } |
78 | |
79 | public void addQuery(String query) throws SiddhiPraserException { |
80 | addQuery(SiddhiCompiler.parseQuery(query)); |
81 | } |
82 | |
83 | public void addExecutionPlan(String addExecutionPlan) throws SiddhiPraserException { |
84 | for (ExecutionPlan executionPlan : SiddhiCompiler.parse(addExecutionPlan)) { |
85 | if (executionPlan instanceof StreamDefinition) { |
86 | defineStream((StreamDefinition) executionPlan); |
87 | } else { |
88 | addQuery((Query) executionPlan); |
89 | } |
90 | } |
91 | } |
92 | |
93 | public void addQuery(Query query) { |
94 | |
95 | List<QueryEventStream> queryEventStreamList = query.getInputStream().constructQueryEventStreamList(streamDefinitionMap, new ArrayList<QueryEventStream>()); |
96 | initQuery(query, queryEventStreamList); |
97 | |
98 | QueryProjector queryProjector = new QueryProjector(query.getOutputStreamId(), query.getProjector(), queryEventStreamList); |
99 | StreamDefinition outputStreamDefinition = queryProjector.getOutputStreamDefinition(); |
100 | defineStream(outputStreamDefinition); |
101 | |
102 | List<StreamReceiver> streamReceiverList = StreamParser.parseStream(query.getInputStream(), queryEventStreamList, queryProjector, threadPoolExecutor); |
103 | queryProjector.setStreamJunction(streamJunctionMap.get(outputStreamDefinition.getStreamId())); |
104 | for (StreamReceiver streamReceiver : streamReceiverList) { |
105 | StreamJunction streamJunction = streamJunctionMap.get(streamReceiver.getStreamId()); |
106 | streamJunction.addEventFlow(streamReceiver); |
107 | } |
108 | |
109 | } |
110 | |
111 | private void initQuery(Query query, List<QueryEventStream> queryEventStreamList) { |
112 | |
113 | // Map<String, StreamDefinition> queryStreamDefinitionMap = getQueryStreamDefinitionMap(query.getInputStream().getStreamIds()); |
114 | //populate projection for * case |
115 | List<OutputAttribute> attributeList = query.getProjector().getProjectionList(); |
116 | if (attributeList.size() == 0) { |
117 | for (QueryEventStream queryEventStream : queryEventStreamList) { |
118 | for (Attribute attribute : queryEventStream.getStreamDefinition().getAttributeList()) { |
119 | attributeList.add(new SimpleAttribute(queryEventStream.getReferenceStreamId() + "_" + attribute.getName(), Expression.variable(queryEventStream.getReferenceStreamId(), attribute.getName()))); |
120 | } |
121 | } |
122 | } |
123 | |
124 | } |
125 | |
126 | private Map<String, StreamDefinition> getQueryStreamDefinitionMap(List<String> streamIds) { |
127 | Map<String, StreamDefinition> map = new HashMap<String, StreamDefinition>(); |
128 | for (String streamId : streamIds) { |
129 | map.put(streamId, streamDefinitionMap.get(streamId)); |
130 | } |
131 | return map; |
132 | } |
133 | |
134 | |
135 | // private void checkQuery(Query query) { |
136 | // List<String> streamIds = query.getInputStream().getStreamIds(); |
137 | // ////System.out.println(streamIds); |
138 | // } |
139 | |
140 | public InputHandler getInputHandler(String streamId) { |
141 | return new InputHandler(streamId, streamJunctionMap.get(streamId)); |
142 | } |
143 | |
144 | public void addCallback(String streamId, Callback callback) { |
145 | callback.setStreamId(streamId); |
146 | callback.setThreadPoolExecutor(threadPoolExecutor); |
147 | streamJunctionMap.get(streamId).addEventFlow(callback); |
148 | } |
149 | } |