Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Release Notes.
* Add unified release script (`tools/releasing/release.sh`) with two-step flow: `prepare-vote` and `vote-passed`.
* Fix an issue where `JDBCPluginConfig.Plugin.JDBC.SQL_BODY_MAX_LENGTH` was not honored by clickhouse-0.3.1 and clickhouse-0.3.2.x plugins.
- Add tracing support for vector-store retrieval operations.
* Fix agent lifecycle events: the Start event now carries the service instance name, and the Shutdown event is delivered on graceful JVM exit. `ServiceManager` prepares/starts higher-priority `BootService`s first and shuts them down last (matching `BootService#priority()`), and the shutdown event refreshes its gRPC deadline before sending.

All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/249?closed=1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void boot() {
}

public void shutdown() {
bootedServices.values().stream().sorted(Comparator.comparingInt(BootService::priority).reversed()).forEach(service -> {
bootedServices.values().stream().sorted(Comparator.comparingInt(BootService::priority)).forEach(service -> {
try {
service.shutdown();
} catch (Throwable e) {
Expand Down Expand Up @@ -103,7 +103,7 @@ private Map<Class, BootService> loadAllServices() {
}

private void prepare() {
bootedServices.values().stream().sorted(Comparator.comparingInt(BootService::priority)).forEach(service -> {
bootedServices.values().stream().sorted(Comparator.comparingInt(BootService::priority).reversed()).forEach(service -> {
try {
service.prepare();
} catch (Throwable e) {
Expand All @@ -113,7 +113,7 @@ private void prepare() {
}

private void startup() {
bootedServices.values().stream().sorted(Comparator.comparingInt(BootService::priority)).forEach(service -> {
bootedServices.values().stream().sorted(Comparator.comparingInt(BootService::priority).reversed()).forEach(service -> {
try {
service.boot();
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ public class EventReportServiceClient implements BootService, GRPCChannelListene

private final AtomicBoolean reported = new AtomicBoolean();

private volatile boolean bootCompleted;

private Event.Builder startingEvent;

private EventServiceGrpc.EventServiceStub eventServiceStub;
private volatile EventServiceGrpc.EventServiceStub eventServiceStub;

private GRPCChannelStatus status;
private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;

@Override
public void prepare() throws Throwable {
Expand Down Expand Up @@ -91,6 +93,7 @@ public void boot() throws Throwable {
@Override
public void onComplete() throws Throwable {
startingEvent.setEndTime(System.currentTimeMillis());
bootCompleted = true;

reportStartingEvent();
}
Expand All @@ -117,7 +120,8 @@ public void shutdown() throws Throwable {
)
.setLayer(EVENT_LAYER_NAME);

final StreamObserver<Event> collector = eventServiceStub.collect(new StreamObserver<Commands>() {
final EventServiceGrpc.EventServiceStub stub = eventServiceStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS);
final StreamObserver<Event> collector = stub.collect(new StreamObserver<Commands>() {
@Override
public void onNext(final Commands commands) {
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
Expand All @@ -143,25 +147,27 @@ public void onCompleted() {

@Override
public void statusChanged(final GRPCChannelStatus status) {
if (CONNECTED.equals(status)) {
final Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
eventServiceStub = EventServiceGrpc.newStub(channel);
}
this.status = status;

if (!CONNECTED.equals(status)) {
return;
if (CONNECTED.equals(status)) {
reportStartingEvent();
}

final Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
eventServiceStub = EventServiceGrpc.newStub(channel);
eventServiceStub = eventServiceStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS);

reportStartingEvent();
}

private void reportStartingEvent() {
if (reported.compareAndSet(false, true)) {
if (!bootCompleted || !CONNECTED.equals(status)) {
return;
}
if (!reported.compareAndSet(false, true)) {
return;
}

final StreamObserver<Event> collector = eventServiceStub.collect(new StreamObserver<Commands>() {
final EventServiceGrpc.EventServiceStub stub = eventServiceStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS);
final StreamObserver<Event> collector = stub.collect(new StreamObserver<Commands>() {
@Override
public void onNext(final Commands commands) {
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.agent.core.boot;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.junit.After;
import org.junit.Test;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

public class ServiceManagerOrderingTest {

private final List<String> prepareOrder = new ArrayList<>();
private final List<String> bootOrder = new ArrayList<>();
private final List<String> shutdownOrder = new ArrayList<>();

private class RecordingService implements BootService {
private final String name;
private final int priority;

private RecordingService(String name, int priority) {
this.name = name;
this.priority = priority;
}

@Override
public void prepare() {
prepareOrder.add(name);
}

@Override
public void boot() {
bootOrder.add(name);
}

@Override
public void onComplete() {
}

@Override
public void shutdown() {
shutdownOrder.add(name);
}

@Override
public int priority() {
return priority;
}
}

@After
public void tearDown() throws Exception {
setBootedServices(new LinkedHashMap<>());
}

@Test
public void higherPriorityBootsFirstAndShutsDownLast() throws Exception {
Map<Class, BootService> services = new LinkedHashMap<>();
services.put(Integer.class, new RecordingService("low", 0));
services.put(Long.class, new RecordingService("high", Integer.MAX_VALUE));
services.put(Short.class, new RecordingService("mid", 100));
setBootedServices(services);

invoke("prepare");
invoke("startup");
ServiceManager.INSTANCE.shutdown();

assertThat(prepareOrder, is(Arrays.asList("high", "mid", "low")));
assertThat(bootOrder, is(Arrays.asList("high", "mid", "low")));
assertThat(shutdownOrder, is(Arrays.asList("low", "mid", "high")));
}

private void setBootedServices(Map<Class, BootService> services) throws Exception {
Field field = ServiceManager.class.getDeclaredField("bootedServices");
field.setAccessible(true);
field.set(ServiceManager.INSTANCE, services);
}

private void invoke(String method) throws Exception {
Method m = ServiceManager.class.getDeclaredMethod(method);
m.setAccessible(true);
m.invoke(ServiceManager.INSTANCE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,10 @@
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.kafka.KafkaReporterPluginConfig.Plugin.Kafka;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.plugin.loader.AgentClassLoader;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.apache.skywalking.apm.util.StringUtil;

Expand Down Expand Up @@ -183,14 +181,10 @@ public final KafkaProducer<String, Bytes> getProducer() {
return producer;
}

/**
* make kafka producer init later but before {@link GRPCChannelManager}
*
* @return priority value
*/
// Higher than the Kafka reporters sharing this producer, so the producer closes only after they stop.
@Override
public int priority() {
return ServiceManager.INSTANCE.findService(GRPCChannelManager.class).priority() - 1;
return 1;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.skywalking.apm.agent.core.kafka;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.lang.reflect.Method;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -46,6 +47,11 @@ public void testAddListener() throws Exception {
assertEquals(counter.get(), times);
}

@Test
public void outranksKafkaReportersSoProducerClosesLast() {
assertTrue(new KafkaProducerManager().priority() > new KafkaTraceSegmentServiceClient().priority());
}

@Test
public void testFormatTopicNameThenRegister() {
KafkaProducerManager kafkaProducerManager = new KafkaProducerManager();
Expand Down
Loading